/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.rpc.pekko;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.pekko.ActorSystemBootstrapTools;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcService;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceConfiguration;
import org.apache.flink.runtime.rpc.pekko.PekkoUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TemporaryClassLoaderContext;
import org.apache.flink.util.function.TriFunction;
import org.apache.pekko.actor.ActorSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PekkoRpcServiceUtils {
    private static final Logger LOG = LoggerFactory.getLogger(PekkoRpcServiceUtils.class);
    private static final String PEKKO_TCP = "pekko.tcp";
    private static final String PEKKO_SSL_TCP = "pekko.ssl.tcp";
    static final String SUPERVISOR_NAME = "rpc";
    private static final String SIMPLE_CONFIG_TEMPLATE = "pekko {remote.classic {netty.tcp {maximum-frame-size = %s}}}";
    private static final String MAXIMUM_FRAME_SIZE_PATH = "pekko.remote.classic.netty.tcp.maximum-frame-size";

    static PekkoRpcService createRemoteRpcService(Configuration configuration, @Nullable String externalAddress, String externalPortRange, @Nullable String bindAddress, Optional<Integer> bindPort) throws Exception {
        PekkoRpcServiceBuilder rpcServiceBuilder = PekkoRpcServiceUtils.remoteServiceBuilder(configuration, externalAddress, externalPortRange);
        if (bindAddress != null) {
            rpcServiceBuilder.withBindAddress(bindAddress);
        }
        bindPort.ifPresent(rpcServiceBuilder::withBindPort);
        return rpcServiceBuilder.createAndStart();
    }

    static PekkoRpcServiceBuilder remoteServiceBuilder(Configuration configuration, @Nullable String externalAddress, String externalPortRange) {
        return new PekkoRpcServiceBuilder(configuration, LOG, externalAddress, externalPortRange);
    }

    @VisibleForTesting
    static PekkoRpcServiceBuilder remoteServiceBuilder(Configuration configuration, @Nullable String externalAddress, int externalPort) {
        return PekkoRpcServiceUtils.remoteServiceBuilder(configuration, externalAddress, String.valueOf(externalPort));
    }

    static PekkoRpcServiceBuilder localServiceBuilder(Configuration configuration) {
        return new PekkoRpcServiceBuilder(configuration, LOG);
    }

    public static String getRpcUrl(String hostname, int port, String endpointName, AddressResolution addressResolution, Configuration config) throws UnknownHostException {
        Preconditions.checkNotNull((Object)config, (String)"config is null");
        boolean sslEnabled = config.getBoolean(AkkaOptions.SSL_ENABLED) && SecurityOptions.isInternalSSLEnabled((Configuration)config);
        return PekkoRpcServiceUtils.getRpcUrl(hostname, port, endpointName, addressResolution, sslEnabled ? Protocol.SSL_TCP : Protocol.TCP);
    }

    public static String getRpcUrl(String hostname, int port, String endpointName, AddressResolution addressResolution, Protocol protocol) throws UnknownHostException {
        Preconditions.checkNotNull((Object)hostname, (String)"hostname is null");
        Preconditions.checkNotNull((Object)endpointName, (String)"endpointName is null");
        Preconditions.checkArgument((boolean)NetUtils.isValidClientPort((int)port), (Object)"port must be in [1, 65535]");
        if (addressResolution == AddressResolution.TRY_ADDRESS_RESOLUTION) {
            InetAddress.getByName(hostname);
        }
        String hostPort = NetUtils.unresolvedHostAndPortToNormalizedString((String)hostname, (int)port);
        return PekkoRpcServiceUtils.internalRpcUrl(endpointName, Optional.of(new RemoteAddressInformation(hostPort, protocol)));
    }

    public static String getLocalRpcUrl(String endpointName) {
        return PekkoRpcServiceUtils.internalRpcUrl(endpointName, Optional.empty());
    }

    public static boolean isRecipientTerminatedException(Throwable exception) {
        return exception.getMessage().contains("had already been terminated.");
    }

    private static String internalRpcUrl(String endpointName, Optional<RemoteAddressInformation> remoteAddressInformation) {
        String protocolPrefix = remoteAddressInformation.map(rai -> PekkoRpcServiceUtils.protocolToString(((RemoteAddressInformation)rai).getProtocol())).orElse("pekko");
        Optional<String> optionalHostnameAndPort = remoteAddressInformation.map(rec$ -> ((RemoteAddressInformation)rec$).getHostnameAndPort());
        StringBuilder url = new StringBuilder(String.format("%s://flink", protocolPrefix));
        optionalHostnameAndPort.ifPresent(hostPort -> url.append("@").append((String)hostPort));
        url.append("/user/").append(SUPERVISOR_NAME).append("/").append(endpointName);
        return url.toString();
    }

    private static String protocolToString(Protocol protocol) {
        return protocol == Protocol.SSL_TCP ? PEKKO_SSL_TCP : PEKKO_TCP;
    }

    public static long extractMaximumFramesize(Configuration configuration) {
        String maxFrameSizeStr = configuration.getString(AkkaOptions.FRAMESIZE);
        String configStr = String.format(SIMPLE_CONFIG_TEMPLATE, maxFrameSizeStr);
        Config config = ConfigFactory.parseString(configStr);
        return config.getBytes(MAXIMUM_FRAME_SIZE_PATH);
    }

    private PekkoRpcServiceUtils() {
    }

    static class PekkoRpcServiceBuilder
    implements RpcSystem.RpcServiceBuilder {
        private final Configuration configuration;
        private final Logger logger;
        @Nullable
        private final String externalAddress;
        @Nullable
        private final String externalPortRange;
        private String actorSystemName = PekkoUtils.getFlinkActorSystemName();
        @Nullable
        private Config actorSystemExecutorConfiguration = null;
        @Nullable
        private Config customConfig = null;
        private String bindAddress = NetUtils.getWildcardIPAddress();
        @Nullable
        private Integer bindPort = null;

        private PekkoRpcServiceBuilder(Configuration configuration, Logger logger, @Nullable String externalAddress, String externalPortRange) {
            this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
            this.logger = (Logger)Preconditions.checkNotNull((Object)logger);
            this.externalAddress = externalAddress == null ? InetAddress.getLoopbackAddress().getHostAddress() : externalAddress;
            this.externalPortRange = (String)Preconditions.checkNotNull((Object)externalPortRange);
        }

        private PekkoRpcServiceBuilder(Configuration configuration, Logger logger) {
            this.configuration = (Configuration)Preconditions.checkNotNull((Object)configuration);
            this.logger = (Logger)Preconditions.checkNotNull((Object)logger);
            this.externalAddress = null;
            this.externalPortRange = null;
        }

        public PekkoRpcServiceBuilder withComponentName(String actorSystemName) {
            this.actorSystemName = (String)Preconditions.checkNotNull((Object)actorSystemName);
            return this;
        }

        public PekkoRpcServiceBuilder withCustomConfig(Config customConfig) {
            this.customConfig = customConfig;
            return this;
        }

        public PekkoRpcServiceBuilder withBindAddress(String bindAddress) {
            this.bindAddress = (String)Preconditions.checkNotNull((Object)bindAddress);
            return this;
        }

        public PekkoRpcServiceBuilder withBindPort(int bindPort) {
            Preconditions.checkArgument((boolean)NetUtils.isValidHostPort((int)bindPort), (Object)("Invalid port number: " + bindPort));
            this.bindPort = bindPort;
            return this;
        }

        public RpcSystem.RpcServiceBuilder withExecutorConfiguration(RpcSystem.FixedThreadPoolExecutorConfiguration executorConfiguration) {
            this.actorSystemExecutorConfiguration = PekkoUtils.getThreadPoolExecutorConfig(executorConfiguration);
            return this;
        }

        public RpcSystem.RpcServiceBuilder withExecutorConfiguration(RpcSystem.ForkJoinExecutorConfiguration executorConfiguration) {
            this.actorSystemExecutorConfiguration = PekkoUtils.getForkJoinExecutorConfig(executorConfiguration);
            return this;
        }

        public PekkoRpcService createAndStart() throws Exception {
            return this.createAndStart((TriFunction<ActorSystem, PekkoRpcServiceConfiguration, ClassLoader, PekkoRpcService>)((TriFunction)PekkoRpcService::new));
        }

        public PekkoRpcService createAndStart(TriFunction<ActorSystem, PekkoRpcServiceConfiguration, ClassLoader, PekkoRpcService> constructor) throws Exception {
            ActorSystem actorSystem;
            if (this.actorSystemExecutorConfiguration == null) {
                this.actorSystemExecutorConfiguration = PekkoUtils.getForkJoinExecutorConfig(ActorSystemBootstrapTools.getForkJoinExecutorConfiguration(this.configuration));
            }
            try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of((ClassLoader)this.getClass().getClassLoader());){
                actorSystem = this.externalAddress == null ? ActorSystemBootstrapTools.startLocalActorSystem(this.configuration, this.actorSystemName, this.logger, this.actorSystemExecutorConfiguration, this.customConfig) : ActorSystemBootstrapTools.startRemoteActorSystem(this.configuration, this.actorSystemName, this.externalAddress, this.externalPortRange, this.bindAddress, Optional.ofNullable(this.bindPort), this.logger, this.actorSystemExecutorConfiguration, this.customConfig);
            }
            return (PekkoRpcService)constructor.apply((Object)actorSystem, (Object)PekkoRpcServiceConfiguration.fromConfiguration(this.configuration), (Object)RpcService.class.getClassLoader());
        }
    }

    public static enum Protocol {
        TCP,
        SSL_TCP;

    }

    private static final class RemoteAddressInformation {
        private final String hostnameAndPort;
        private final Protocol protocol;

        private RemoteAddressInformation(String hostnameAndPort, Protocol protocol) {
            this.hostnameAndPort = hostnameAndPort;
            this.protocol = protocol;
        }

        private String getHostnameAndPort() {
            return this.hostnameAndPort;
        }

        private Protocol getProtocol() {
            return this.protocol;
        }
    }
}

