/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.testutils;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.net.URL;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSource;
import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.testutils.MockKafkaMetadataService;
import org.apache.flink.connector.testframe.external.ExternalSystemSplitDataWriter;
import org.apache.flink.connector.testframe.external.source.DataStreamSourceExternalContext;
import org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.streaming.connectors.kafka.DynamicKafkaSourceTestHelper;
import org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicKafkaSourceExternalContext
implements DataStreamSourceExternalContext<String> {
    private static final Logger logger = LoggerFactory.getLogger(DynamicKafkaSourceExternalContext.class);
    private static final int NUM_TEST_RECORDS_PER_SPLIT = 10;
    private static final int NUM_PARTITIONS = 1;
    private static final Pattern STREAM_ID_PATTERN = Pattern.compile("stream-[0-9]+");
    private final List<URL> connectorJarPaths;
    private final Set<KafkaStream> kafkaStreams = new HashSet<KafkaStream>();
    private final Map<String, Properties> clusterPropertiesMap;
    private final List<SplitDataWriter> splitDataWriters = new ArrayList<SplitDataWriter>();
    private final long randomTopicSuffix;

    public DynamicKafkaSourceExternalContext(List<String> bootstrapServerList, List<URL> connectorJarPaths) {
        this.connectorJarPaths = connectorJarPaths;
        Properties propertiesForCluster0 = new Properties();
        propertiesForCluster0.setProperty("bootstrap.servers", bootstrapServerList.get(0));
        Properties propertiesForCluster1 = new Properties();
        propertiesForCluster1.setProperty("bootstrap.servers", bootstrapServerList.get(1));
        this.clusterPropertiesMap = ImmutableMap.of((Object)"cluster0", (Object)propertiesForCluster0, (Object)"cluster1", (Object)propertiesForCluster1);
        this.randomTopicSuffix = ThreadLocalRandom.current().nextLong(0L, Long.MAX_VALUE);
    }

    public Source<String, ?, ?> createSource(TestingSourceSettings sourceSettings) throws UnsupportedOperationException {
        DynamicKafkaSourceBuilder builder = DynamicKafkaSource.builder();
        builder.setStreamPattern(STREAM_ID_PATTERN).setKafkaMetadataService((KafkaMetadataService)new MockKafkaMetadataService(this.kafkaStreams)).setGroupId("DynamicKafkaSourceExternalContext").setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
        if (sourceSettings.getBoundedness().equals((Object)Boundedness.BOUNDED)) {
            builder.setBounded(OffsetsInitializer.latest());
        }
        return builder.build();
    }

    public ExternalSystemSplitDataWriter<String> createSourceSplitDataWriter(TestingSourceSettings sourceSettings) {
        int suffix = this.splitDataWriters.size();
        List<Tuple2<String, String>> clusterTopics = this.setupSplits(String.valueOf(suffix));
        SplitDataWriter splitDataWriter = new SplitDataWriter(this.clusterPropertiesMap, clusterTopics);
        this.splitDataWriters.add(splitDataWriter);
        return splitDataWriter;
    }

    private List<Tuple2<String, String>> setupSplits(String suffix) {
        KafkaStream kafkaStream = this.getKafkaStream(suffix + this.randomTopicSuffix);
        logger.info("Setting up splits for {}", (Object)kafkaStream);
        List<Tuple2<String, String>> clusterTopics = kafkaStream.getClusterMetadataMap().entrySet().stream().flatMap(entry -> ((ClusterMetadata)entry.getValue()).getTopics().stream().map(topic -> Tuple2.of(entry.getKey(), (Object)topic))).collect(Collectors.toList());
        for (Tuple2 tuple2 : clusterTopics) {
            String cluster = (String)tuple2.f0;
            String topic = (String)tuple2.f1;
            KafkaTestEnvironmentImpl.createNewTopic(topic, 1, 1, this.clusterPropertiesMap.get(cluster));
        }
        this.kafkaStreams.add(kafkaStream);
        return clusterTopics;
    }

    private KafkaStream getKafkaStream(String suffix) {
        return new KafkaStream("stream-" + suffix, (Map)ImmutableMap.of((Object)"cluster0", (Object)new ClusterMetadata((Set)ImmutableSet.of((Object)("topic0-" + suffix), (Object)("topic1-" + suffix)), this.clusterPropertiesMap.get("cluster0")), (Object)"cluster1", (Object)new ClusterMetadata((Set)ImmutableSet.of((Object)("topic2-" + suffix), (Object)("topic3-" + suffix)), this.clusterPropertiesMap.get("cluster1"))));
    }

    public List<String> generateTestData(TestingSourceSettings sourceSettings, int splitIndex, long seed) {
        return IntStream.range(0, 10).boxed().map(num -> Integer.toString(num)).collect(Collectors.toList());
    }

    public TypeInformation<String> getProducedType() {
        return TypeInformation.of(String.class);
    }

    public List<URL> getConnectorJarPaths() {
        return this.connectorJarPaths;
    }

    public void close() throws Exception {
        HashMap<String, List> clusterTopics = new HashMap<String, List>();
        for (SplitDataWriter splitDataWriter : this.splitDataWriters) {
            for (Tuple2<String, String> clusterTopic : splitDataWriter.getClusterTopics()) {
                clusterTopics.computeIfAbsent((String)clusterTopic.f0, unused -> new ArrayList()).add(clusterTopic.f1);
            }
        }
        for (Map.Entry entry : clusterTopics.entrySet()) {
            String cluster = (String)entry.getKey();
            List topics = (List)entry.getValue();
            try (AdminClient adminClient = AdminClient.create((Properties)this.clusterPropertiesMap.get(cluster));){
                adminClient.deleteTopics((Collection)topics).all().get();
                CommonTestUtils.waitUtil(() -> {
                    try {
                        return ((Collection)adminClient.listTopics().listings().get()).stream().map(TopicListing::name).noneMatch(topics::contains);
                    }
                    catch (Exception e) {
                        logger.warn("Exception caught when listing Kafka topics", (Throwable)e);
                        return false;
                    }
                }, (Duration)Duration.ofSeconds(30L), (String)String.format("Topics %s were not deleted within timeout", topics));
            }
            logger.info("topics {} are deleted from {}", (Object)topics, (Object)cluster);
        }
    }

    private static class SplitDataWriter
    implements ExternalSystemSplitDataWriter<String> {
        private final Map<String, Properties> clusterPropertiesMap;
        private final List<Tuple2<String, String>> clusterTopics;

        public SplitDataWriter(Map<String, Properties> clusterPropertiesMap, List<Tuple2<String, String>> clusterTopics) {
            this.clusterPropertiesMap = clusterPropertiesMap;
            this.clusterTopics = clusterTopics;
        }

        public void writeRecords(List<String> records) {
            int counter = 0;
            try {
                for (Tuple2<String, String> clusterTopic : this.clusterTopics) {
                    String cluster = (String)clusterTopic.f0;
                    String topic = (String)clusterTopic.f1;
                    ArrayList<ProducerRecord> producerRecords = new ArrayList<ProducerRecord>();
                    for (int j = 0; j < 1; ++j) {
                        for (int k = 0; k < 10 && records.size() > counter; ++k) {
                            producerRecords.add(new ProducerRecord(topic, Integer.valueOf(j), null, (Object)records.get(counter++)));
                        }
                    }
                    logger.info("Writing producer records: {}", producerRecords);
                    DynamicKafkaSourceTestHelper.produceToKafka((Properties)this.clusterPropertiesMap.get(cluster), producerRecords, StringSerializer.class, StringSerializer.class);
                }
            }
            catch (Throwable e) {
                throw new RuntimeException("Failed to produce test data", e);
            }
        }

        public void close() throws Exception {
        }

        public List<Tuple2<String, String>> getClusterTopics() {
            return this.clusterTopics;
        }
    }
}

