/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.net.InetAddress;
import java.time.Duration;
import java.util.List;
import javax.net.ssl.SSLSessionContext;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.netty.NettyBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyClient;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.netty.NettyProtocol;
import org.apache.flink.runtime.io.network.netty.NettyServer;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringEncoder;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameter;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.util.NetUtils;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith(value={ParameterizedTestExtension.class})
class NettyClientServerSslTest {
    @Parameter
    private String sslProvider;

    NettyClientServerSslTest() {
    }

    @Parameters(name="SSL provider = {0}")
    public static List<String> parameters() {
        return SSLUtilsTest.AVAILABLE_SSL_PROVIDERS;
    }

    @TestTemplate
    void testValidSslConnection() throws Exception {
        this.testValidSslConnection(this.createSslConfig());
    }

    @TestTemplate
    void testValidSslConnectionAdvanced() throws Exception {
        Configuration sslConfig = this.createSslConfig();
        sslConfig.set(SecurityOptions.SSL_INTERNAL_SESSION_CACHE_SIZE, (Object)1);
        int timeoutInMillisBase = (int)Duration.ofHours(1L).toMillis();
        sslConfig.set(SecurityOptions.SSL_INTERNAL_SESSION_TIMEOUT, (Object)(timeoutInMillisBase + 1));
        sslConfig.set(SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, (Object)(timeoutInMillisBase + 2));
        sslConfig.set(SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, (Object)(timeoutInMillisBase + 3));
        this.testValidSslConnection(sslConfig);
    }

    private void testValidSslConnection(Configuration sslConfig) throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient;
        OneShotLatch serverChannelInitComplete = new OneShotLatch();
        SslHandler[] serverSslHandler = new SslHandler[1];
        NettyTestUtil.NoOpProtocol protocol = new NettyTestUtil.NoOpProtocol();
        try (NetUtils.Port port = NetUtils.getAvailablePort();){
            NettyConfig nettyConfig = NettyClientServerSslTest.createNettyConfig(sslConfig, port);
            NettyBufferPool bufferPool = new NettyBufferPool(1);
            NettyServer server = NettyTestUtil.initServer(nettyConfig, bufferPool, sslHandlerFactory -> new TestingServerChannelInitializer(protocol, (SSLHandlerFactory)sslHandlerFactory, serverChannelInitComplete, serverSslHandler));
            NettyClient client = NettyTestUtil.initClient(nettyConfig, protocol, bufferPool);
            serverAndClient = new NettyTestUtil.NettyServerAndClient(server, client);
        }
        ((ObjectAssert)Assertions.assertThat((Object)serverAndClient).withFailMessage("serverAndClient is null due to fail to get a free port", new Object[0])).isNotNull();
        Channel ch = NettyTestUtil.connect(serverAndClient);
        SslHandler clientSslHandler = (SslHandler)ch.pipeline().get("ssl");
        NettyClientServerSslTest.assertEqualsOrDefault(sslConfig, (ConfigOption<Integer>)SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, clientSslHandler.getHandshakeTimeoutMillis());
        NettyClientServerSslTest.assertEqualsOrDefault(sslConfig, (ConfigOption<Integer>)SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, clientSslHandler.getCloseNotifyFlushTimeoutMillis());
        ch.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        ch.writeAndFlush((Object)"test").sync();
        serverChannelInitComplete.await();
        Assertions.assertThat((Object)serverSslHandler[0]).isNotNull();
        NettyClientServerSslTest.assertEqualsOrDefault(sslConfig, (ConfigOption<Integer>)SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, serverSslHandler[0].getHandshakeTimeoutMillis());
        NettyClientServerSslTest.assertEqualsOrDefault(sslConfig, (ConfigOption<Integer>)SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, serverSslHandler[0].getCloseNotifyFlushTimeoutMillis());
        SSLSessionContext sessionContext = serverSslHandler[0].engine().getSession().getSessionContext();
        ((ObjectAssert)Assertions.assertThat((Object)sessionContext).withFailMessage("bug in unit test setup: session context not available", new Object[0])).isNotNull();
        NettyClientServerSslTest.assertEqualsOrDefault(sslConfig, (ConfigOption<Integer>)SecurityOptions.SSL_INTERNAL_SESSION_CACHE_SIZE, sessionContext.getSessionCacheSize());
        int sessionTimeout = (Integer)sslConfig.get(SecurityOptions.SSL_INTERNAL_SESSION_TIMEOUT);
        if (sessionTimeout != -1) {
            Assertions.assertThat((int)sessionContext.getSessionTimeout()).isEqualTo(sessionTimeout / 1000);
        } else {
            ((AbstractIntegerAssert)Assertions.assertThat((int)sessionContext.getSessionTimeout()).withFailMessage("default value (-1) should not be propagated", new Object[0])).isGreaterThanOrEqualTo(0);
        }
        NettyTestUtil.shutdown(serverAndClient);
    }

    private static void assertEqualsOrDefault(Configuration sslConfig, ConfigOption<Integer> option, long actual) {
        long expected = ((Integer)sslConfig.get(option)).intValue();
        if (expected != (long)((Integer)option.defaultValue()).intValue()) {
            Assertions.assertThat((long)actual).isEqualTo(expected);
        } else {
            ((AbstractLongAssert)Assertions.assertThat((long)actual).withFailMessage("default value (%d) should not be propagated", new Object[]{option.defaultValue()})).isGreaterThanOrEqualTo(0L);
        }
    }

    @TestTemplate
    public void testInvalidSslConfiguration() throws Exception {
        NettyTestUtil.NoOpProtocol protocol = new NettyTestUtil.NoOpProtocol();
        Configuration config = this.createSslConfig();
        config.set(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, (Object)"invalidpassword");
        try (NetUtils.Port port = NetUtils.getAvailablePort();){
            NettyConfig nettyConfig = NettyClientServerSslTest.createNettyConfig(config, port);
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> NettyTestUtil.initServerAndClient(protocol, nettyConfig)).withFailMessage("Created server and client from invalid configuration", new Object[0])).isInstanceOf(IOException.class);
        }
    }

    @TestTemplate
    void testSslHandshakeError() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient;
        NettyTestUtil.NoOpProtocol protocol = new NettyTestUtil.NoOpProtocol();
        Configuration config = this.createSslConfig();
        config.set(SecurityOptions.SSL_INTERNAL_KEYSTORE, (Object)"src/test/resources/untrusted.keystore");
        try (NetUtils.Port port = NetUtils.getAvailablePort();){
            NettyConfig nettyConfig = NettyClientServerSslTest.createNettyConfig(config, port);
            serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
        }
        ((ObjectAssert)Assertions.assertThat((Object)serverAndClient).withFailMessage("serverAndClient is null due to fail to get a free port", new Object[0])).isNotNull();
        Channel ch = NettyTestUtil.connect(serverAndClient);
        ch.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assertions.assertThat((boolean)ch.writeAndFlush((Object)"test").await().isSuccess()).isFalse();
        NettyTestUtil.shutdown(serverAndClient);
    }

    @TestTemplate
    void testClientUntrustedCertificate() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient;
        Configuration serverConfig = this.createSslConfig();
        Configuration clientConfig = this.createSslConfig();
        clientConfig.set(SecurityOptions.SSL_INTERNAL_KEYSTORE, (Object)"src/test/resources/untrusted.keystore");
        try (NetUtils.Port serverPort = NetUtils.getAvailablePort();
             NetUtils.Port clientPort = NetUtils.getAvailablePort();){
            NettyConfig nettyServerConfig = NettyClientServerSslTest.createNettyConfig(serverConfig, serverPort);
            NettyConfig nettyClientConfig = NettyClientServerSslTest.createNettyConfig(clientConfig, clientPort);
            NettyBufferPool bufferPool = new NettyBufferPool(1);
            NettyTestUtil.NoOpProtocol protocol = new NettyTestUtil.NoOpProtocol();
            NettyServer server = NettyTestUtil.initServer(nettyServerConfig, protocol, bufferPool);
            NettyClient client = NettyTestUtil.initClient(nettyClientConfig, protocol, bufferPool);
            serverAndClient = new NettyTestUtil.NettyServerAndClient(server, client);
        }
        ((ObjectAssert)Assertions.assertThat((Object)serverAndClient).withFailMessage("serverAndClient is null due to fail to get a free port", new Object[0])).isNotNull();
        Channel ch = NettyTestUtil.connect(serverAndClient);
        ch.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assertions.assertThat((boolean)ch.writeAndFlush((Object)"test").await().isSuccess()).isFalse();
        NettyTestUtil.shutdown(serverAndClient);
    }

    @TestTemplate
    void testSslPinningForValidFingerprint() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient;
        NettyTestUtil.NoOpProtocol protocol = new NettyTestUtil.NoOpProtocol();
        Configuration config = this.createSslConfig();
        config.set(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT, (Object)SSLUtilsTest.getCertificateFingerprint(config, "flink.test"));
        try (NetUtils.Port port = NetUtils.getAvailablePort();){
            NettyConfig nettyConfig = NettyClientServerSslTest.createNettyConfig(config, port);
            serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
        }
        ((ObjectAssert)Assertions.assertThat((Object)serverAndClient).withFailMessage("serverAndClient is null due to fail to get a free port", new Object[0])).isNotNull();
        Channel ch = NettyTestUtil.connect(serverAndClient);
        ch.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assertions.assertThat((boolean)ch.writeAndFlush((Object)"test").await().isSuccess()).isTrue();
        NettyTestUtil.shutdown(serverAndClient);
    }

    @TestTemplate
    void testSslPinningForInvalidFingerprint() throws Exception {
        NettyTestUtil.NettyServerAndClient serverAndClient;
        NettyTestUtil.NoOpProtocol protocol = new NettyTestUtil.NoOpProtocol();
        Configuration config = this.createSslConfig();
        config.set(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT, (Object)SSLUtilsTest.getCertificateFingerprint(config, "flink.test").replaceAll("[0-9A-Z]", "0"));
        try (NetUtils.Port port = NetUtils.getAvailablePort();){
            NettyConfig nettyConfig = NettyClientServerSslTest.createNettyConfig(config, port);
            serverAndClient = NettyTestUtil.initServerAndClient(protocol, nettyConfig);
        }
        ((ObjectAssert)Assertions.assertThat((Object)serverAndClient).withFailMessage("serverAndClient is null due to fail to get a free port", new Object[0])).isNotNull();
        Channel ch = NettyTestUtil.connect(serverAndClient);
        ch.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assertions.assertThat((boolean)ch.writeAndFlush((Object)"test").await().isSuccess()).isFalse();
        NettyTestUtil.shutdown(serverAndClient);
    }

    private Configuration createSslConfig() {
        return SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores(this.sslProvider);
    }

    private static NettyConfig createNettyConfig(Configuration config, NetUtils.Port availablePort) {
        return new NettyConfig(InetAddress.getLoopbackAddress(), availablePort.getPort(), 1024, 1, config);
    }

    private static class TestingServerChannelInitializer
    extends NettyServer.ServerChannelInitializer {
        private final OneShotLatch latch;
        private final SslHandler[] serverHandler;

        TestingServerChannelInitializer(NettyProtocol protocol, SSLHandlerFactory sslHandlerFactory, OneShotLatch latch, SslHandler[] serverHandler) {
            super(protocol, sslHandlerFactory);
            this.latch = latch;
            this.serverHandler = serverHandler;
        }

        public void initChannel(SocketChannel channel) throws Exception {
            super.initChannel(channel);
            SslHandler sslHandler = (SslHandler)channel.pipeline().get("ssl");
            Assertions.assertThat((Object)sslHandler).isNotNull();
            this.serverHandler[0] = sslHandler;
            this.latch.trigger();
        }
    }
}

