/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import com.typesafe.config.Config;
import java.io.IOException;
import java.net.BindException;
import java.util.Iterator;
import java.util.Optional;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.pekko.HostAndPort;
import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
import org.apache.flink.util.NetUtils;
import org.apache.pekko.actor.ActorSystem;
import org.jboss.netty.channel.ChannelException;
import org.slf4j.Logger;

public class ActorSystemBootstrapTools {
    @VisibleForTesting
    public static ActorSystem startRemoteActorSystem(Configuration configuration, String externalAddress, String externalPortRange, Logger logger) throws Exception {
        return ActorSystemBootstrapTools.startRemoteActorSystem(configuration, PekkoUtils.getFlinkActorSystemName(), externalAddress, externalPortRange, NetUtils.getWildcardIPAddress(), Optional.empty(), logger, PekkoUtils.getForkJoinExecutorConfig(ActorSystemBootstrapTools.getForkJoinExecutorConfiguration(configuration)), null);
    }

    public static ActorSystem startRemoteActorSystem(Configuration configuration, String actorSystemName, String externalAddress, String externalPortRange, String bindAddress, Optional<Integer> bindPort, Logger logger, Config actorSystemExecutorConfiguration, Config customConfig) throws Exception {
        Iterator portsIterator;
        try {
            portsIterator = NetUtils.getPortRangeFromString((String)externalPortRange);
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Invalid port range definition: " + externalPortRange);
        }
        while (portsIterator.hasNext()) {
            int externalPort = (Integer)portsIterator.next();
            try {
                return ActorSystemBootstrapTools.startRemoteActorSystem(configuration, actorSystemName, externalAddress, externalPort, bindAddress, bindPort.orElse(externalPort), logger, actorSystemExecutorConfiguration, customConfig);
            }
            catch (Exception e) {
                Throwable cause = e.getCause();
                if (cause instanceof ChannelException || cause instanceof BindException) continue;
                throw e;
            }
        }
        throw new BindException("Could not start actor system on any port in port range " + externalPortRange);
    }

    private static ActorSystem startRemoteActorSystem(Configuration configuration, String actorSystemName, String externalAddress, int externalPort, String bindAddress, int bindPort, Logger logger, Config actorSystemExecutorConfiguration, Config customConfig) throws Exception {
        String externalHostPortUrl = NetUtils.unresolvedHostAndPortToNormalizedString((String)externalAddress, (int)externalPort);
        String bindHostPortUrl = NetUtils.unresolvedHostAndPortToNormalizedString((String)bindAddress, (int)bindPort);
        logger.info("Trying to start actor system, external address {}, bind address {}.", (Object)externalHostPortUrl, (Object)bindHostPortUrl);
        try {
            Config pekkoConfig = PekkoUtils.getConfig(configuration, new HostAndPort(externalAddress, externalPort), new HostAndPort(bindAddress, bindPort), actorSystemExecutorConfiguration);
            if (customConfig != null) {
                pekkoConfig = customConfig.withFallback(pekkoConfig);
            }
            return ActorSystemBootstrapTools.startActorSystem(pekkoConfig, actorSystemName, logger);
        }
        catch (Throwable t) {
            Throwable cause;
            if (t instanceof org.apache.flink.shaded.netty4.io.netty.channel.ChannelException && (cause = t.getCause()) != null && t.getCause() instanceof BindException) {
                throw new IOException("Unable to create ActorSystem at address " + bindHostPortUrl + " : " + cause.getMessage(), t);
            }
            throw new Exception("Could not create actor system", t);
        }
    }

    public static ActorSystem startLocalActorSystem(Configuration configuration, String actorSystemName, Logger logger, Config actorSystemExecutorConfiguration, Config customConfig) throws Exception {
        logger.info("Trying to start local actor system");
        try {
            Config pekkoConfig = PekkoUtils.getConfig(configuration, null, null, actorSystemExecutorConfiguration);
            if (customConfig != null) {
                pekkoConfig = customConfig.withFallback(pekkoConfig);
            }
            return ActorSystemBootstrapTools.startActorSystem(pekkoConfig, actorSystemName, logger);
        }
        catch (Throwable t) {
            throw new Exception("Could not create actor system", t);
        }
    }

    private static ActorSystem startActorSystem(Config config, String actorSystemName, Logger logger) {
        logger.debug("Using pekko configuration\n {}", (Object)config);
        ActorSystem actorSystem = PekkoUtils.createActorSystem(actorSystemName, config);
        logger.info("Actor system started at {}", (Object)PekkoUtils.getAddress(actorSystem));
        return actorSystem;
    }

    private ActorSystemBootstrapTools() {
    }

    public static RpcSystem.ForkJoinExecutorConfiguration getForkJoinExecutorConfiguration(Configuration configuration) {
        double parallelismFactor = configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
        int minParallelism = configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
        int maxParallelism = configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX);
        return new RpcSystem.ForkJoinExecutorConfiguration(parallelismFactor, minParallelism, maxParallelism);
    }
}

