/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.testutils;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.JobManagerMode;
import org.apache.flink.runtime.testutils.TestJvmProcess;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

public class JobManagerProcess
extends TestJvmProcess {
    private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcess.class);
    private final int id;
    private final int jobManagerPort;
    private final Configuration config;
    private final String[] jvmArgs;
    private ActorRef jobManagerRef;

    public JobManagerProcess(int id, Configuration config) throws Exception {
        this(id, config, 0);
    }

    public JobManagerProcess(int id, Configuration config, int jobManagerPort) throws Exception {
        Preconditions.checkArgument((id >= 0 ? 1 : 0) != 0, (Object)"Negative ID");
        this.id = id;
        this.config = (Configuration)Preconditions.checkNotNull((Object)config, (Object)"Configuration");
        this.jobManagerPort = jobManagerPort <= 0 ? NetUtils.getAvailablePort() : jobManagerPort;
        ArrayList<String> args = new ArrayList<String>();
        args.add("--port");
        args.add(String.valueOf(this.jobManagerPort));
        for (Map.Entry entry : config.toMap().entrySet()) {
            args.add("--" + (String)entry.getKey());
            args.add((String)entry.getValue());
        }
        this.jvmArgs = new String[args.size()];
        args.toArray(this.jvmArgs);
    }

    @Override
    public String getName() {
        return "JobManager " + this.id;
    }

    @Override
    public String[] getJvmArgs() {
        return this.jvmArgs;
    }

    @Override
    public String getEntryPointClassName() {
        return JobManagerProcessEntryPoint.class.getName();
    }

    public int getJobManagerPort() {
        return this.jobManagerPort;
    }

    public Configuration getConfig() {
        return this.config;
    }

    public String getJobManagerAkkaURL() {
        return JobManager.getRemoteJobManagerAkkaURL((InetSocketAddress)new InetSocketAddress("localhost", this.jobManagerPort), (Option)Option.empty());
    }

    public String toString() {
        return String.format("JobManagerProcess(id=%d, port=%d)", this.id, this.jobManagerPort);
    }

    public ActorRef getActorRef(ActorSystem actorSystem, FiniteDuration timeout) throws Exception {
        if (this.jobManagerRef != null) {
            return this.jobManagerRef;
        }
        Preconditions.checkNotNull((Object)actorSystem, (Object)"Actor system");
        Deadline deadline = timeout.fromNow();
        while (deadline.hasTimeLeft()) {
            try {
                this.jobManagerRef = AkkaUtils.getActorRef((String)this.getJobManagerAkkaURL(), (ActorSystem)actorSystem, (FiniteDuration)deadline.timeLeft());
                return this.jobManagerRef;
            }
            catch (Throwable ignored) {
                Thread.sleep(Math.min(100L, deadline.timeLeft().toMillis()));
            }
        }
        throw new IllegalStateException("JobManager did not start up within " + timeout + ".");
    }

    public static class JobManagerProcessEntryPoint {
        private static final Logger LOG = LoggerFactory.getLogger(JobManagerProcessEntryPoint.class);

        public static void main(String[] args) {
            try {
                ParameterTool params = ParameterTool.fromArgs((String[])args);
                int port = Integer.valueOf(params.getRequired("port"));
                LOG.info("Running on port {}.", (Object)port);
                Configuration config = params.getConfiguration();
                LOG.info("Configuration: {}.", (Object)config);
                JobManager.runJobManager((Configuration)config, (JobManagerMode)JobManagerMode.CLUSTER, (StreamingMode)StreamingMode.STREAMING, (String)"localhost", (int)port);
                new CountDownLatch(1).await();
            }
            catch (Throwable t) {
                LOG.error("Failed to start JobManager process", t);
                System.exit(1);
            }
        }
    }
}

