/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RpcOptions;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.pekko.ActorSystemBootstrapTools;
import org.apache.flink.runtime.rpc.pekko.CustomSSLEngineProvider;
import org.apache.flink.runtime.rpc.pekko.EscalatingSupervisorStrategy;
import org.apache.flink.runtime.rpc.pekko.HostAndPort;
import org.apache.flink.runtime.rpc.pekko.PriorityThreadsDispatcher;
import org.apache.flink.runtime.rpc.pekko.RemoteAddressExtension;
import org.apache.flink.runtime.rpc.pekko.RobustActorSystem;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.function.FunctionUtils;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Address;
import org.apache.pekko.actor.AddressFromURIString;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PekkoUtils {
    private static final Logger LOG = LoggerFactory.getLogger(PekkoUtils.class);
    private static final String FLINK_ACTOR_SYSTEM_NAME = "flink";

    PekkoUtils() {
    }

    public static String getFlinkActorSystemName() {
        return FLINK_ACTOR_SYSTEM_NAME;
    }

    private static Config getBasicConfig(Configuration configuration) {
        int throughput = (Integer)configuration.get(RpcOptions.DISPATCHER_THROUGHPUT);
        String jvmExitOnFatalError = PekkoUtils.booleanToOnOrOff((Boolean)configuration.get(RpcOptions.JVM_EXIT_ON_FATAL_ERROR));
        String logLifecycleEvents = PekkoUtils.booleanToOnOrOff((Boolean)configuration.get(RpcOptions.LOG_LIFECYCLE_EVENTS));
        String supervisorStrategy2 = EscalatingSupervisorStrategy.class.getCanonicalName();
        return new ConfigBuilder().add("pekko {").add("  daemonic = off").add("  loggers = [\"org.apache.pekko.event.slf4j.Slf4jLogger\"]").add("  logging-filter = \"org.apache.pekko.event.slf4j.Slf4jLoggingFilter\"").add("  log-config-on-start = off").add("  logger-startup-timeout = 50s").add("  loglevel = " + PekkoUtils.getLogLevel()).add("  stdout-loglevel = OFF").add("  log-dead-letters = " + logLifecycleEvents).add("  log-dead-letters-during-shutdown = " + logLifecycleEvents).add("  jvm-exit-on-fatal-error = " + jvmExitOnFatalError).add("  serialize-messages = off").add("  actor {").add("    guardian-supervisor-strategy = " + supervisorStrategy2).add("    warn-about-java-serializer-usage = off").add("    allow-java-serialization = on").add("    default-dispatcher {").add("      throughput = " + throughput).add("    }").add("    supervisor-dispatcher {").add("      type = Dispatcher").add("      executor = \"thread-pool-executor\"").add("      thread-pool-executor {").add("        core-pool-size-min = 1").add("        core-pool-size-max = 1").add("      }").add("    }").add("  }").add("}").build();
    }

    private static String getLogLevel() {
        if (LOG.isTraceEnabled()) {
            return "DEBUG";
        }
        if (LOG.isDebugEnabled()) {
            return "DEBUG";
        }
        if (LOG.isInfoEnabled()) {
            return "INFO";
        }
        if (LOG.isWarnEnabled()) {
            return "WARNING";
        }
        if (LOG.isErrorEnabled()) {
            return "ERROR";
        }
        return "OFF";
    }

    public static Config getThreadPoolExecutorConfig(RpcSystem.FixedThreadPoolExecutorConfiguration configuration) {
        int threadPriority = configuration.getThreadPriority();
        int minNumThreads = configuration.getMinNumThreads();
        int maxNumThreads = configuration.getMaxNumThreads();
        return new ConfigBuilder().add("pekko {").add("  actor {").add("    default-dispatcher {").add("      type = " + PriorityThreadsDispatcher.class.getCanonicalName()).add("      executor = thread-pool-executor").add("      thread-priority = " + threadPriority).add("      thread-pool-executor {").add("          core-pool-size-min = " + minNumThreads).add("          core-pool-size-max = " + maxNumThreads).add("      }").add("    }").add("  }").add("}").build();
    }

    public static Config getForkJoinExecutorConfig(RpcSystem.ForkJoinExecutorConfiguration configuration) {
        double parallelismFactor = configuration.getParallelismFactor();
        int minNumThreads = configuration.getMinParallelism();
        int maxNumThreads = configuration.getMaxParallelism();
        return new ConfigBuilder().add("pekko {").add("  actor {").add("    default-dispatcher {").add("      executor = fork-join-executor").add("      fork-join-executor {").add("          parallelism-factor = " + parallelismFactor).add("          parallelism-min = " + minNumThreads).add("          parallelism-max = " + maxNumThreads).add("      }").add("    }").add("  }").add("}").build();
    }

    private static Config getRemoteConfig(Configuration configuration, String bindAddress, int port, String externalHostname, int externalPort) {
        ConfigBuilder builder = new ConfigBuilder();
        PekkoUtils.addBaseRemoteConfig(builder, configuration, port, externalPort);
        PekkoUtils.addHostnameRemoteConfig(builder, bindAddress, externalHostname);
        PekkoUtils.addSslRemoteConfig(builder, configuration);
        PekkoUtils.addRemoteForkJoinExecutorConfig(builder, ActorSystemBootstrapTools.getRemoteForkJoinExecutorConfiguration(configuration));
        return builder.build();
    }

    private static void addBaseRemoteConfig(ConfigBuilder configBuilder, Configuration configuration, int port, int externalPort) {
        Duration askTimeout = (Duration)configuration.get(RpcOptions.ASK_TIMEOUT_DURATION);
        String startupTimeout = TimeUtils.getStringInMillis((Duration)((Duration)configuration.get(RpcOptions.STARTUP_TIMEOUT, (Object)askTimeout.multipliedBy(10L))));
        String tcpTimeout = TimeUtils.getStringInMillis((Duration)((Duration)configuration.get(RpcOptions.TCP_TIMEOUT)));
        String framesize = (String)configuration.get(RpcOptions.FRAMESIZE);
        int clientSocketWorkerPoolPoolSizeMin = (Integer)configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MIN);
        int clientSocketWorkerPoolPoolSizeMax = (Integer)configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_MAX);
        double clientSocketWorkerPoolPoolSizeFactor = (Double)configuration.get(RpcOptions.CLIENT_SOCKET_WORKER_POOL_SIZE_FACTOR);
        int serverSocketWorkerPoolPoolSizeMin = (Integer)configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MIN);
        int serverSocketWorkerPoolPoolSizeMax = (Integer)configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_MAX);
        double serverSocketWorkerPoolPoolSizeFactor = (Double)configuration.get(RpcOptions.SERVER_SOCKET_WORKER_POOL_SIZE_FACTOR);
        String logLifecycleEvents = PekkoUtils.booleanToOnOrOff((Boolean)configuration.get(RpcOptions.LOG_LIFECYCLE_EVENTS));
        long retryGateClosedFor = (Long)configuration.get(RpcOptions.RETRY_GATE_CLOSED_FOR);
        configBuilder.add("pekko {").add("  actor {").add("    provider = \"org.apache.pekko.remote.RemoteActorRefProvider\"").add("  }").add("  remote.artery.enabled = false").add("  remote.startup-timeout = " + startupTimeout).add("  remote.warn-about-direct-use = off").add("  remote.use-unsafe-remote-features-outside-cluster = on").add("  remote.classic {").add("    # disable the transport failure detector by setting very high values").add("    transport-failure-detector{").add("      acceptable-heartbeat-pause = 6000 s").add("      heartbeat-interval = 1000 s").add("      threshold = 300").add("    }").add("    enabled-transports = [\"pekko.remote.classic.netty.tcp\"]").add("    netty {").add("      tcp {").add("        transport-class = \"org.apache.pekko.remote.transport.netty.NettyTransport\"").add("        port = " + externalPort).add("        bind-port = " + port).add("        connection-timeout = " + tcpTimeout).add("        maximum-frame-size = " + framesize).add("        tcp-nodelay = on").add("        client-socket-worker-pool {").add("          pool-size-min = " + clientSocketWorkerPoolPoolSizeMin).add("          pool-size-max = " + clientSocketWorkerPoolPoolSizeMax).add("          pool-size-factor = " + clientSocketWorkerPoolPoolSizeFactor).add("        }").add("        server-socket-worker-pool {").add("          pool-size-min = " + serverSocketWorkerPoolPoolSizeMin).add("          pool-size-max = " + serverSocketWorkerPoolPoolSizeMax).add("          pool-size-factor = " + serverSocketWorkerPoolPoolSizeFactor).add("        }").add("      }").add("    }").add("    log-remote-lifecycle-events = " + logLifecycleEvents).add("    retry-gate-closed-for = " + retryGateClosedFor + " ms").add("  }").add("}");
    }

    private static void addHostnameRemoteConfig(ConfigBuilder configBuilder, String bindAddress, String externalHostname) {
        String normalizedExternalHostname = NetUtils.unresolvedHostToNormalizedString((String)externalHostname);
        String effectiveHostname = normalizedExternalHostname != null && !normalizedExternalHostname.isEmpty() ? normalizedExternalHostname : "";
        configBuilder.add("pekko {").add("  remote.classic {").add("    netty {").add("      tcp {").add("        hostname = \"" + effectiveHostname + "\"").add("        bind-hostname = \"" + bindAddress + "\"").add("      }").add("    }").add("  }").add("}");
    }

    private static void addSslRemoteConfig(ConfigBuilder configBuilder, Configuration configuration) {
        boolean enableSSLConfig = (Boolean)configuration.get(RpcOptions.SSL_ENABLED) != false && SecurityOptions.isInternalSSLEnabled((Configuration)configuration);
        String enableSSL = PekkoUtils.booleanToOnOrOff(enableSSLConfig);
        String sslKeyStore = (String)configuration.get(SecurityOptions.SSL_INTERNAL_KEYSTORE, (Object)((String)configuration.get(SecurityOptions.SSL_KEYSTORE)));
        String sslKeyStorePassword = (String)configuration.get(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, (Object)((String)configuration.get(SecurityOptions.SSL_KEYSTORE_PASSWORD)));
        String sslKeyStoreType = (String)configuration.get(SecurityOptions.SSL_INTERNAL_KEYSTORE_TYPE);
        String sslKeyPassword = (String)configuration.get(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD, (Object)((String)configuration.get(SecurityOptions.SSL_KEY_PASSWORD)));
        String sslTrustStore = (String)configuration.get(SecurityOptions.SSL_INTERNAL_TRUSTSTORE, (Object)((String)configuration.get(SecurityOptions.SSL_TRUSTSTORE)));
        String sslTrustStorePassword = (String)configuration.get(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_PASSWORD, (Object)((String)configuration.get(SecurityOptions.SSL_TRUSTSTORE_PASSWORD)));
        String sslTrustStoreType = (String)configuration.get(SecurityOptions.SSL_INTERNAL_TRUSTSTORE_TYPE);
        String sslCertFingerprintString = (String)configuration.get(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT);
        String sslCertFingerprints = sslCertFingerprintString != null ? Arrays.stream(sslCertFingerprintString.split(",")).collect(Collectors.joining("\",\"", "[\"", "\"]")) : "[]";
        String sslProtocol = (String)configuration.get(SecurityOptions.SSL_PROTOCOL);
        String sslAlgorithmsString = (String)configuration.get(SecurityOptions.SSL_ALGORITHMS);
        String sslAlgorithms = Arrays.stream(sslAlgorithmsString.split(",")).collect(Collectors.joining(",", "[", "]"));
        String sslEngineProviderName = CustomSSLEngineProvider.class.getCanonicalName();
        configBuilder.add("pekko {").add("  remote.classic {").add("    enabled-transports = [\"pekko.remote.classic.netty.ssl\"]").add("    netty {").add("      ssl = ${pekko.remote.classic.netty.tcp}").add("      ssl {").add("        enable-ssl = " + enableSSL).add("        ssl-engine-provider = " + sslEngineProviderName).add("        security {").add("          key-store = \"" + sslKeyStore + "\"").add("          key-store-password = \"" + sslKeyStorePassword + "\"").add("          key-store-type = \"" + sslKeyStoreType + "\"").add("          key-password = \"" + sslKeyPassword + "\"").add("          trust-store = \"" + sslTrustStore + "\"").add("          trust-store-password = \"" + sslTrustStorePassword + "\"").add("          trust-store-type = \"" + sslTrustStoreType + "\"").add("          protocol = " + sslProtocol).add("          enabled-algorithms = " + sslAlgorithms).add("          random-number-generator = \"\"").add("          require-mutual-authentication = on").add("          cert-fingerprints = " + sslCertFingerprints).add("        }").add("      }").add("    }").add("  }").add("}");
    }

    private static Config addRemoteForkJoinExecutorConfig(ConfigBuilder builder, RpcSystem.ForkJoinExecutorConfiguration configuration) {
        double parallelismFactor = configuration.getParallelismFactor();
        int minNumThreads = configuration.getMinParallelism();
        int maxNumThreads = configuration.getMaxParallelism();
        return builder.add("pekko {").add("  remote {").add("    default-remote-dispatcher {").add("      executor = fork-join-executor").add("      fork-join-executor {").add("          parallelism-factor = " + parallelismFactor).add("          parallelism-min = " + minNumThreads).add("          parallelism-max = " + maxNumThreads).add("      }").add("    }").add("  }").add("}").build();
    }

    public static ActorSystem createLocalActorSystem(Configuration configuration) {
        return PekkoUtils.createActorSystem(PekkoUtils.getConfig(configuration, null));
    }

    private static ActorSystem createActorSystem(Config config) {
        return PekkoUtils.createActorSystem(PekkoUtils.getFlinkActorSystemName(), config);
    }

    public static ActorSystem createActorSystem(String actorSystemName, Config config) {
        InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
        return RobustActorSystem.create(actorSystemName, config);
    }

    @VisibleForTesting
    public static ActorSystem createDefaultActorSystem() {
        return PekkoUtils.createActorSystem(PekkoUtils.getDefaultConfig());
    }

    private static Config getDefaultConfig() {
        return PekkoUtils.getConfig(new Configuration(), new HostAndPort("", 0));
    }

    public static Config getConfig(Configuration configuration, @Nullable HostAndPort externalAddress) {
        return PekkoUtils.getConfig(configuration, externalAddress, null, PekkoUtils.getForkJoinExecutorConfig(ActorSystemBootstrapTools.getForkJoinExecutorConfiguration(configuration)));
    }

    public static Config getConfig(Configuration configuration, @Nullable HostAndPort externalAddress, @Nullable HostAndPort bindAddress, Config executorConfig) {
        Config defaultConfig = PekkoUtils.getBasicConfig(configuration).withFallback(executorConfig);
        if (externalAddress != null) {
            if (bindAddress != null) {
                Config remoteConfig = PekkoUtils.getRemoteConfig(configuration, bindAddress.getHost(), bindAddress.getPort(), externalAddress.getHost(), externalAddress.getPort());
                return remoteConfig.withFallback(defaultConfig);
            }
            Config remoteConfig = PekkoUtils.getRemoteConfig(configuration, NetUtils.getWildcardIPAddress(), externalAddress.getPort(), externalAddress.getHost(), externalAddress.getPort());
            return remoteConfig.withFallback(defaultConfig);
        }
        return defaultConfig;
    }

    public static Address getAddress(ActorSystem system) {
        return ((RemoteAddressExtension.RemoteAddressExtensionImpl)RemoteAddressExtension.INSTANCE.apply(system)).getAddress();
    }

    public static String getRpcURL(ActorSystem system, ActorRef actor) {
        Address address = PekkoUtils.getAddress(system);
        return actor.path().toStringWithAddress(address);
    }

    public static Address getAddressFromRpcURL(String rpcURL) throws MalformedURLException {
        return AddressFromURIString.apply(rpcURL);
    }

    public static InetSocketAddress getInetSocketAddressFromRpcURL(String rpcURL) throws Exception {
        try {
            Address address = PekkoUtils.getAddressFromRpcURL(rpcURL);
            if (address.host().isDefined() && address.port().isDefined()) {
                return new InetSocketAddress(address.host().get(), (int)((Integer)address.port().get()));
            }
            throw new MalformedURLException();
        }
        catch (MalformedURLException e) {
            throw new Exception("Could not retrieve InetSocketAddress from Pekko URL " + rpcURL);
        }
    }

    public static CompletableFuture<Void> terminateActorSystem(ActorSystem actorSystem) {
        return ScalaFutureUtils.toJava(actorSystem.terminate()).thenAccept(FunctionUtils.ignoreFn());
    }

    private static String booleanToOnOrOff(boolean flag) {
        return flag ? "on" : "off";
    }

    private static class ConfigBuilder {
        private final StringWriter stringWriter = new StringWriter();
        private final PrintWriter printWriter = new PrintWriter(this.stringWriter);

        private ConfigBuilder() {
        }

        public ConfigBuilder add(String configLine) {
            this.printWriter.println(configLine);
            return this;
        }

        public Config build() {
            return ConfigFactory.parseString(this.stringWriter.toString()).resolve();
        }
    }
}

