/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.environment;

import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
import org.apache.flink.client.program.ProgramInvocationException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteStreamEnvironment
extends StreamExecutionEnvironment {
    private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
    private final String host;
    private final int port;
    private final Configuration config;
    private final List<URL> jarFiles;
    private final List<URL> globalClasspaths;

    public RemoteStreamEnvironment(String host, int port, String ... jarFiles) {
        this(host, port, (Configuration)null, jarFiles);
    }

    public RemoteStreamEnvironment(String host, int port, Configuration config, String ... jarFiles) {
        this(host, port, config, jarFiles, (URL[])null);
    }

    public RemoteStreamEnvironment(String host, int port, Configuration config, String[] jarFiles, URL[] globalClasspaths) {
        if (!ExecutionEnvironment.areExplicitEnvironmentsAllowed()) {
            throw new InvalidProgramException("The RemoteEnvironment cannot be used when submitting a program through a client, or running in a TestEnvironment context.");
        }
        if (host == null) {
            throw new NullPointerException("Host must not be null.");
        }
        if (port < 1 || port >= 65535) {
            throw new IllegalArgumentException("Port out of range");
        }
        this.host = host;
        this.port = port;
        this.config = config == null ? new Configuration() : config;
        this.jarFiles = new ArrayList<URL>(jarFiles.length);
        for (String jarFile : jarFiles) {
            try {
                URL jarFileUrl = new File(jarFile).getAbsoluteFile().toURI().toURL();
                this.jarFiles.add(jarFileUrl);
                JobWithJars.checkJarFile((URL)jarFileUrl);
            }
            catch (MalformedURLException e) {
                throw new IllegalArgumentException("JAR file path is invalid '" + jarFile + "'", e);
            }
            catch (IOException e) {
                throw new RuntimeException("Problem with jar file " + jarFile, e);
            }
        }
        this.globalClasspaths = globalClasspaths == null ? Collections.emptyList() : Arrays.asList(globalClasspaths);
    }

    @Override
    public JobExecutionResult execute(String jobName) throws ProgramInvocationException {
        JobGraph jobGraph = this.getStreamGraph().getJobGraph(jobName);
        this.transformations.clear();
        return this.executeRemotely(jobGraph);
    }

    private JobExecutionResult executeRemotely(JobGraph jobGraph) throws ProgramInvocationException {
        Client client;
        if (LOG.isInfoEnabled()) {
            LOG.info("Running remotely at {}:{}", (Object)this.host, (Object)this.port);
        }
        for (URL jarFile : this.jarFiles) {
            try {
                jobGraph.addJar(new Path(jarFile.toURI()));
            }
            catch (URISyntaxException e) {
                throw new ProgramInvocationException("URL is invalid", (Throwable)e);
            }
        }
        ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader(this.jarFiles, this.globalClasspaths, (ClassLoader)this.getClass().getClassLoader());
        Configuration configuration = new Configuration();
        configuration.addAll(jobGraph.getJobConfiguration());
        configuration.addAll(this.config);
        configuration.setString("jobmanager.rpc.address", this.host);
        configuration.setInteger("jobmanager.rpc.port", this.port);
        try {
            client = new Client(configuration);
            client.setPrintStatusDuringExecution(this.getConfig().isSysoutLoggingEnabled());
        }
        catch (Exception e) {
            throw new ProgramInvocationException("Cannot establish connection to JobManager: " + e.getMessage(), (Throwable)e);
        }
        try {
            JobExecutionResult e = client.runBlocking(jobGraph, usercodeClassLoader);
            return e;
        }
        catch (ProgramInvocationException e) {
            throw e;
        }
        catch (Exception e) {
            String term = e.getMessage() == null ? "." : ": " + e.getMessage();
            throw new ProgramInvocationException("The program execution failed" + term, (Throwable)e);
        }
        finally {
            client.shutdown();
        }
    }

    public String toString() {
        return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " + (this.getParallelism() == -1 ? "default" : Integer.valueOf(this.getParallelism())) + ")";
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }
}

