/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.kafka;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections.list.UnmodifiableList;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.testutils.KafkaUtil;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.operators.StreamSink;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.utility.DockerImageName;

public class KafkaTestEnvironmentImpl
extends KafkaTestEnvironment {
    protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestEnvironmentImpl.class);
    private static final String ZOOKEEPER_HOSTNAME = "zookeeper";
    private static final int ZOOKEEPER_PORT = 2181;
    private final Map<Integer, KafkaContainer> brokers = new HashMap<Integer, KafkaContainer>();
    private final Set<Integer> pausedBroker = new HashSet<Integer>();
    @Nullable
    private GenericContainer<?> zookeeper;
    @Nullable
    private Network network;
    private String brokerConnectionString = "";
    private Properties standardProps;
    private FlinkKafkaProducer.Semantic producerSemantic = FlinkKafkaProducer.Semantic.EXACTLY_ONCE;
    private int zkTimeout = 30000;
    private KafkaTestEnvironment.Config config;
    private static final int REQUEST_TIMEOUT_SECONDS = 30;

    public void setProducerSemantic(FlinkKafkaProducer.Semantic producerSemantic) {
        this.producerSemantic = producerSemantic;
    }

    @Override
    public void prepare(KafkaTestEnvironment.Config config) throws Exception {
        if (config.isSecureMode()) {
            config.setKafkaServersNumber(1);
            this.zkTimeout *= 15;
        }
        this.config = config;
        this.brokers.clear();
        LOG.info("Starting KafkaServer");
        this.startKafkaContainerCluster(config.getKafkaServersNumber());
        LOG.info("KafkaServer started.");
        this.standardProps = new Properties();
        this.standardProps.setProperty("bootstrap.servers", this.brokerConnectionString);
        this.standardProps.setProperty("group.id", "flink-tests");
        this.standardProps.setProperty("enable.auto.commit", "false");
        this.standardProps.setProperty("zookeeper.session.timeout.ms", String.valueOf(this.zkTimeout));
        this.standardProps.setProperty("zookeeper.connection.timeout.ms", String.valueOf(this.zkTimeout));
        this.standardProps.setProperty("auto.offset.reset", "earliest");
        this.standardProps.setProperty("max.partition.fetch.bytes", "256");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void deleteTestTopic(String topic) {
        LOG.info("Deleting topic {}", (Object)topic);
        Properties props = this.getSecureProperties();
        props.putAll((Map<?, ?>)this.getStandardProperties());
        String clientId = Long.toString(new Random().nextLong());
        props.put("client.id", clientId);
        AdminClient adminClient = AdminClient.create((Properties)props);
        try {
            this.tryDelete(adminClient, topic);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)String.format("Delete test topic : %s failed, %s", topic, e.getMessage()));
        }
        finally {
            adminClient.close(Duration.ofMillis(5000L));
            this.maybePrintDanglingThreadStacktrace(clientId);
        }
    }

    private void tryDelete(AdminClient adminClient, String topic) throws Exception {
        block2: {
            try {
                adminClient.deleteTopics(Collections.singleton(topic)).all().get();
                CommonTestUtils.waitUtil(() -> {
                    try {
                        return ((Collection)adminClient.listTopics().listings().get()).stream().map(TopicListing::name).noneMatch(name -> name.equals(topic));
                    }
                    catch (Exception e) {
                        LOG.warn("Exception caught when listing Kafka topics", (Throwable)e);
                        return false;
                    }
                }, (Duration)Duration.ofSeconds(30L), (String)String.format("Topic \"%s\" was not deleted within timeout", topic));
            }
            catch (TimeoutException e) {
                LOG.info("Did not receive delete topic response within {} seconds. Checking if it succeeded", (Object)30);
                if (!((Set)adminClient.listTopics().names().get()).contains(topic)) break block2;
                throw new Exception("Topic still exists after timeout", e);
            }
        }
    }

    @Override
    public void createTestTopic(String topic, int numberOfPartitions, int replicationFactor, Properties properties) {
        KafkaTestEnvironmentImpl.createNewTopic(topic, numberOfPartitions, replicationFactor, this.getStandardProperties());
    }

    public static void createNewTopic(String topic, int numberOfPartitions, int replicationFactor, Properties properties) {
        LOG.info("Creating topic {}", (Object)topic);
        try (AdminClient adminClient = AdminClient.create((Properties)properties);){
            NewTopic topicObj = new NewTopic(topic, numberOfPartitions, (short)replicationFactor);
            adminClient.createTopics(Collections.singleton(topicObj)).all().get();
            CommonTestUtils.waitUtil(() -> {
                Map topicDescriptions;
                try {
                    topicDescriptions = (Map)adminClient.describeTopics(Collections.singleton(topic)).allTopicNames().get(30L, TimeUnit.SECONDS);
                }
                catch (Exception e) {
                    LOG.warn("Exception caught when describing Kafka topics", (Throwable)e);
                    return false;
                }
                if (topicDescriptions == null || !topicDescriptions.containsKey(topic)) {
                    return false;
                }
                TopicDescription topicDescription = (TopicDescription)topicDescriptions.get(topic);
                return topicDescription.partitions().size() == numberOfPartitions;
            }, (Duration)Duration.ofSeconds(30L), (String)String.format("New topic \"%s\" is not ready within timeout", topicObj));
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)("Create test topic : " + topic + " failed, " + e.getMessage()));
        }
    }

    @Override
    public Properties getStandardProperties() {
        return this.standardProps;
    }

    @Override
    public Properties getSecureProperties() {
        Properties prop = new Properties();
        if (this.config.isSecureMode()) {
            prop.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
            prop.put("security.protocol", "SASL_PLAINTEXT");
            prop.put("sasl.kerberos.service.name", "kafka");
            prop.setProperty("zookeeper.session.timeout.ms", String.valueOf(this.zkTimeout));
            prop.setProperty("zookeeper.connection.timeout.ms", String.valueOf(this.zkTimeout));
            prop.setProperty("metadata.fetch.timeout.ms", "120000");
        }
        return prop;
    }

    @Override
    public String getBrokerConnectionString() {
        return this.brokerConnectionString;
    }

    @Override
    public String getVersion() {
        return "confluentinc/cp-kafka:7.4.4";
    }

    @Override
    public <T> FlinkKafkaConsumerBase<T> getConsumer(List<String> topics, KafkaDeserializationSchema<T> readSchema, Properties props) {
        return new FlinkKafkaConsumer(topics, readSchema, props);
    }

    @Override
    public <T> KafkaSourceBuilder<T> getSourceBuilder(List<String> topics, KafkaDeserializationSchema<T> schema, Properties props) {
        return KafkaSource.builder().setTopics(topics).setDeserializer(KafkaRecordDeserializationSchema.of(schema)).setProperties(props);
    }

    @Override
    public <K, V> Collection<ConsumerRecord<K, V>> getAllRecordsFromTopic(Properties properties, String topic) {
        return UnmodifiableList.decorate(KafkaUtil.drainAllRecordsFromTopic(topic, properties));
    }

    @Override
    public <T> StreamSink<T> getProducerSink(String topic, SerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
        return new StreamSink((SinkFunction)new FlinkKafkaProducer(topic, serSchema, props, partitioner, this.producerSemantic, 5));
    }

    @Override
    public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KeyedSerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
        return stream.addSink((SinkFunction)new FlinkKafkaProducer(topic, serSchema, props, Optional.ofNullable(partitioner), this.producerSemantic, 5));
    }

    @Override
    public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, SerializationSchema<T> serSchema, Properties props, FlinkKafkaPartitioner<T> partitioner) {
        return stream.addSink((SinkFunction)new FlinkKafkaProducer(topic, serSchema, props, partitioner, this.producerSemantic, 5));
    }

    @Override
    public <T> DataStreamSink<T> produceIntoKafka(DataStream<T> stream, String topic, KafkaSerializationSchema<T> serSchema, Properties props) {
        return stream.addSink((SinkFunction)new FlinkKafkaProducer(topic, serSchema, props, this.producerSemantic));
    }

    @Override
    public KafkaTestEnvironment.KafkaOffsetHandler createOffsetHandler() {
        return new KafkaOffsetHandlerImpl();
    }

    @Override
    public void restartBroker(int leaderId) throws Exception {
        this.unpause(leaderId);
    }

    @Override
    public void stopBroker(int brokerId) throws Exception {
        this.pause(brokerId);
    }

    @Override
    public int getLeaderToShutDown(String topic) throws Exception {
        try (AdminClient client = AdminClient.create((Properties)this.getStandardProperties());){
            TopicDescription result = (TopicDescription)((Map)client.describeTopics(Collections.singleton(topic)).allTopicNames().get()).get(topic);
            int n = ((TopicPartitionInfo)result.partitions().get(0)).leader().id();
            return n;
        }
    }

    @Override
    public boolean isSecureRunSupported() {
        return true;
    }

    @Override
    public void shutdown() throws Exception {
        this.brokers.values().forEach(GenericContainer::stop);
        this.brokers.clear();
        if (this.zookeeper != null) {
            this.zookeeper.stop();
        }
        if (this.network != null) {
            this.network.close();
        }
    }

    private void startKafkaContainerCluster(int numBrokers) {
        if (numBrokers > 1) {
            this.network = Network.newNetwork();
            this.zookeeper = this.createZookeeperContainer(this.network);
            this.zookeeper.start();
            LOG.info("Zookeeper container started");
        }
        for (int brokerID = 0; brokerID < numBrokers; ++brokerID) {
            KafkaContainer broker = this.createKafkaContainer(brokerID, this.zookeeper);
            this.brokers.put(brokerID, broker);
        }
        new ArrayList<KafkaContainer>(this.brokers.values()).parallelStream().forEach(GenericContainer::start);
        LOG.info("{} brokers started", (Object)numBrokers);
        this.brokerConnectionString = this.brokers.values().stream().map(KafkaContainer::getBootstrapServers).map(server -> server.split("://")[1]).collect(Collectors.joining(","));
    }

    private GenericContainer<?> createZookeeperContainer(Network network) {
        return new GenericContainer(DockerImageName.parse((String)"zookeeper:3.4.14")).withNetwork(network).withNetworkAliases(new String[]{ZOOKEEPER_HOSTNAME}).withEnv("ZOOKEEPER_CLIENT_PORT", String.valueOf(2181));
    }

    private KafkaContainer createKafkaContainer(int brokerID, @Nullable GenericContainer<?> zookeeper) {
        String brokerName = String.format("Kafka-%d", brokerID);
        KafkaContainer broker = (KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)KafkaUtil.createKafkaContainer(brokerName).withNetworkAliases(new String[]{brokerName})).withEnv("KAFKA_BROKER_ID", String.valueOf(brokerID))).withEnv("KAFKA_MESSAGE_MAX_BYTES", String.valueOf(0x3200000))).withEnv("KAFKA_REPLICA_FETCH_MAX_BYTES", String.valueOf(0x3200000))).withEnv("KAFKA_TRANSACTION_MAX_TIMEOUT_MS", Integer.toString(0x6DDD00))).withEnv("KAFKA_LOG_RETENTION_MS", "-1")).withEnv("KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS", String.valueOf(this.zkTimeout))).withEnv("KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS", String.valueOf(this.zkTimeout));
        if (zookeeper != null) {
            ((KafkaContainer)((KafkaContainer)broker.dependsOn(new Startable[]{zookeeper})).withNetwork(zookeeper.getNetwork())).withExternalZookeeper(String.format("%s:%d", ZOOKEEPER_HOSTNAME, 2181));
        } else {
            broker.withEmbeddedZookeeper();
        }
        return broker;
    }

    private void pause(int brokerId) {
        if (this.pausedBroker.contains(brokerId)) {
            LOG.warn("Broker {} is already paused. Skipping pause operation", (Object)brokerId);
            return;
        }
        DockerClientFactory.instance().client().pauseContainerCmd(this.brokers.get(brokerId).getContainerId()).exec();
        this.pausedBroker.add(brokerId);
        LOG.info("Broker {} is paused", (Object)brokerId);
    }

    private void unpause(int brokerId) throws Exception {
        if (!this.pausedBroker.contains(brokerId)) {
            LOG.warn("Broker {} is already running. Skipping unpause operation", (Object)brokerId);
            return;
        }
        DockerClientFactory.instance().client().unpauseContainerCmd(this.brokers.get(brokerId).getContainerId()).exec();
        try (AdminClient adminClient = AdminClient.create((Properties)this.getStandardProperties());){
            CommonTestUtils.waitUtil(() -> {
                try {
                    return ((Collection)adminClient.describeCluster().nodes().get()).stream().anyMatch(node -> node.id() == brokerId);
                }
                catch (Exception e) {
                    return false;
                }
            }, (Duration)Duration.ofSeconds(30L), (String)String.format("The paused broker %d is not recovered within timeout", brokerId));
        }
        this.pausedBroker.remove(brokerId);
        LOG.info("Broker {} is resumed", (Object)brokerId);
    }

    private class KafkaOffsetHandlerImpl
    implements KafkaTestEnvironment.KafkaOffsetHandler {
        private final KafkaConsumer<byte[], byte[]> offsetClient;

        public KafkaOffsetHandlerImpl() {
            Properties props = new Properties();
            props.putAll((Map<?, ?>)KafkaTestEnvironmentImpl.this.standardProps);
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            this.offsetClient = new KafkaConsumer(props);
        }

        @Override
        public Long getCommittedOffset(String topicName, int partition) {
            OffsetAndMetadata committed = this.offsetClient.committed(new TopicPartition(topicName, partition));
            return committed != null ? Long.valueOf(committed.offset()) : null;
        }

        @Override
        public void setCommittedOffset(String topicName, int partition, long offset) {
            HashMap<TopicPartition, OffsetAndMetadata> partitionAndOffset = new HashMap<TopicPartition, OffsetAndMetadata>();
            partitionAndOffset.put(new TopicPartition(topicName, partition), new OffsetAndMetadata(offset));
            this.offsetClient.commitSync(partitionAndOffset);
        }

        @Override
        public void close() {
            this.offsetClient.close();
        }
    }
}

