/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.kafka;

import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseAction;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncTableAction;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.utils.StringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startable;
import org.testcontainers.utility.DockerImageName;

public abstract class KafkaActionITCaseBase
extends CdcActionITCaseBase {
    protected final ObjectMapper objectMapper = new ObjectMapper();
    private static final Logger LOG = LoggerFactory.getLogger(KafkaActionITCaseBase.class);
    private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
    private static final String INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS = "schemaregistry";
    private static final Network NETWORK = Network.newNetwork();
    private static final int ZK_TIMEOUT_MILLIS = 30000;
    protected static KafkaProducer<String, String> kafkaProducer;
    private static KafkaConsumer<String, String> kafkaConsumer;
    private static AdminClient adminClient;
    private final Timer loggingTimer = new Timer("Debug Logging Timer");
    @RegisterExtension
    @Order(value=1)
    public static final KafkaContainerExtension KAFKA_CONTAINER;
    @RegisterExtension
    @Order(value=2)
    public static final SchemaRegistryContainerExtension SCHEMA_REGISTRY_CONTAINER;

    @BeforeAll
    public static void beforeAll() {
        Properties producerProperties = KafkaActionITCaseBase.getStandardProps();
        producerProperties.setProperty("retries", "0");
        producerProperties.put("key.serializer", StringSerializer.class.getCanonicalName());
        producerProperties.put("value.serializer", StringSerializer.class.getCanonicalName());
        kafkaProducer = new KafkaProducer(producerProperties);
        Properties consumerProperties = KafkaActionITCaseBase.getStandardProps();
        consumerProperties.setProperty("group.id", "flink-tests-debugging");
        consumerProperties.setProperty("key.deserializer", StringDeserializer.class.getCanonicalName());
        consumerProperties.setProperty("value.deserializer", StringDeserializer.class.getCanonicalName());
        kafkaConsumer = new KafkaConsumer(consumerProperties);
        adminClient = AdminClient.create((Properties)KafkaActionITCaseBase.getStandardProps());
    }

    @AfterAll
    public static void afterAll() {
        kafkaProducer.close();
        kafkaConsumer.close();
        adminClient.close();
    }

    @BeforeEach
    public void setup() {
        this.scheduleTimeoutLogger(Duration.ofSeconds(30L), () -> {
            Map<String, TopicDescription> topicDescriptions = this.describeExternalTopics();
            LOG.info("Current existing topics: {}", topicDescriptions.keySet());
            this.logTopicPartitionStatus(topicDescriptions);
        });
    }

    @AfterEach
    public void after() throws Exception {
        super.after();
        this.cancelTimeoutLogger();
        this.deleteTopics();
    }

    private void deleteTopics() throws ExecutionException, InterruptedException {
        adminClient.deleteTopics((Collection)adminClient.listTopics().names().get()).all().get();
    }

    private void scheduleTimeoutLogger(Duration period, final Runnable loggingAction) {
        TimerTask timeoutLoggerTask = new TimerTask(){

            @Override
            public void run() {
                try {
                    loggingAction.run();
                }
                catch (Exception e) {
                    throw new RuntimeException("Failed to execute logging action", e);
                }
            }
        };
        this.loggingTimer.schedule(timeoutLoggerTask, 0L, period.toMillis());
    }

    private void cancelTimeoutLogger() {
        this.loggingTimer.cancel();
    }

    private Map<String, TopicDescription> describeExternalTopics() {
        try {
            List topics = ((Collection)adminClient.listTopics().listings().get()).stream().filter(listing -> !listing.isInternal()).map(TopicListing::name).collect(Collectors.toList());
            return (Map)adminClient.describeTopics(topics).allTopicNames().get();
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to list Kafka topics", e);
        }
    }

    private synchronized void logTopicPartitionStatus(Map<String, TopicDescription> topicDescriptions) {
        ArrayList partitions = new ArrayList();
        topicDescriptions.forEach((topic, description) -> description.partitions().forEach(tpInfo -> partitions.add(new TopicPartition(topic, tpInfo.partition()))));
        Map beginningOffsets = kafkaConsumer.beginningOffsets(partitions);
        Map endOffsets = kafkaConsumer.endOffsets(partitions);
        partitions.forEach(partition -> LOG.info("TopicPartition \"{}\": starting offset: {}, stopping offset: {}", new Object[]{partition, beginningOffsets.get(partition), endOffsets.get(partition)}));
    }

    public static Properties getStandardProps() {
        Properties standardProps = new Properties();
        standardProps.put("bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
        standardProps.put("group.id", "paimon-tests");
        standardProps.put("enable.auto.commit", (Object)false);
        standardProps.put("auto.offset.reset", "earliest");
        standardProps.put("max.partition.fetch.bytes", (Object)256);
        standardProps.put("zookeeper.session.timeout.ms", (Object)30000);
        standardProps.put("zookeeper.connection.timeout.ms", (Object)30000);
        standardProps.put("default.api.timeout.ms", "120000");
        return standardProps;
    }

    public String getBootstrapServers() {
        return KAFKA_CONTAINER.getBootstrapServers();
    }

    protected Map<String, String> getBasicKafkaConfig() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put("properties.bootstrap.servers", KAFKA_CONTAINER.getBootstrapServers());
        config.put("properties.group.id", "paimon-tests");
        config.put("properties.enable.auto.commit", "false");
        config.put("properties.auto.offset.reset", "earliest");
        return config;
    }

    protected String getSchemaRegistryUrl() {
        return SCHEMA_REGISTRY_CONTAINER.getSchemaRegistryUrl();
    }

    protected KafkaSyncTableActionBuilder syncTableActionBuilder(Map<String, String> kafkaConfig) {
        return new KafkaSyncTableActionBuilder(kafkaConfig);
    }

    protected KafkaSyncDatabaseActionBuilder syncDatabaseActionBuilder(Map<String, String> kafkaConfig) {
        return new KafkaSyncDatabaseActionBuilder(kafkaConfig);
    }

    protected void createTestTopic(String topic, int numPartitions, int replicationFactor) {
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("bootstrap.servers", this.getBootstrapServers());
        try {
            adminClient.createTopics(Collections.singletonList(new NewTopic(topic, numPartitions, (short)replicationFactor))).all().get();
        }
        catch (Exception e) {
            throw new IllegalStateException(String.format("Fail to create topic [%s partitions: %d replication factor: %d].", topic, numPartitions, replicationFactor), e);
        }
    }

    protected void writeRecordsToKafka(String topic, String resourceDirFormat, Object ... args) throws Exception {
        this.writeRecordsToKafka(topic, false, resourceDirFormat, args);
    }

    protected void writeRecordsToKafka(String topic, boolean wait, String resourceDirFormat, Object ... args) throws Exception {
        URL url = KafkaActionITCaseBase.class.getClassLoader().getResource(String.format(resourceDirFormat, args));
        Files.readAllLines(Paths.get(url.toURI())).stream().filter(this::isRecordLine).forEach(r -> this.send(topic, (String)r, wait));
    }

    protected boolean isRecordLine(String line) {
        try {
            this.objectMapper.readTree(line);
            return !StringUtils.isEmpty((CharSequence)line);
        }
        catch (JsonProcessingException e) {
            return false;
        }
    }

    private void send(String topic, String record, boolean wait) {
        Future sendFuture = kafkaProducer.send(new ProducerRecord(topic, (Object)record));
        if (wait) {
            try {
                sendFuture.get();
            }
            catch (InterruptedException | ExecutionException e) {
                throw new RuntimeException(e);
            }
        }
    }

    static {
        KAFKA_CONTAINER = (KafkaContainerExtension)((KafkaContainer)((KafkaContainer)((KafkaContainer)new KafkaContainerExtension(DockerImageName.parse((String)"confluentinc/cp-kafka:7.2.2")){

            protected void doStart() {
                super.doStart();
                if (LOG.isInfoEnabled()) {
                    this.followOutput((Consumer)new Slf4jLogConsumer(LOG));
                }
            }
        }.withEmbeddedZookeeper().withNetwork(NETWORK)).withNetworkAliases(new String[]{INTER_CONTAINER_KAFKA_ALIAS})).withEnv("KAFKA_TRANSACTION_MAX_TIMEOUT_MS", String.valueOf(Duration.ofHours(2L).toMillis()))).withEnv("KAFKA_LOG_RETENTION_MS", "-1");
        SCHEMA_REGISTRY_CONTAINER = (SchemaRegistryContainerExtension)((SchemaRegistryContainerExtension)((SchemaRegistryContainerExtension)((SchemaRegistryContainerExtension)((SchemaRegistryContainerExtension)((SchemaRegistryContainerExtension)((SchemaRegistryContainerExtension)new SchemaRegistryContainerExtension(DockerImageName.parse((String)"confluentinc/cp-schema-registry:7.2.2")).dependsOn(new Startable[]{KAFKA_CONTAINER})).withNetwork(NETWORK)).withEnv("SCHEMA_REGISTRY_HOST_NAME", "schema_registry")).withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + (String)KAFKA_CONTAINER.getNetworkAliases().get(0) + ":9092")).withNetworkAliases(new String[]{INTER_CONTAINER_SCHEMA_REGISTRY_ALIAS})).withLogConsumer((Consumer)new Slf4jLogConsumer(LOG))).withStartupTimeout(Duration.ofSeconds(60L));
    }

    private static class SchemaRegistryContainerExtension
    extends GenericContainer<SchemaRegistryContainerExtension>
    implements BeforeAllCallback,
    AfterAllCallback {
        private static final Integer SCHEMA_REGISTRY_EXPOSED_PORT = 8081;

        private SchemaRegistryContainerExtension(DockerImageName dockerImageName) {
            super(dockerImageName);
            this.addExposedPorts(new int[]{SCHEMA_REGISTRY_EXPOSED_PORT});
        }

        public void beforeAll(ExtensionContext extensionContext) {
            this.doStart();
        }

        public void afterAll(ExtensionContext extensionContext) {
            this.close();
        }

        public String getSchemaRegistryUrl() {
            return String.format("http://%s:%s", this.getHost(), this.getMappedPort(SCHEMA_REGISTRY_EXPOSED_PORT));
        }
    }

    private static class KafkaContainerExtension
    extends KafkaContainer
    implements BeforeAllCallback,
    AfterAllCallback {
        private KafkaContainerExtension(DockerImageName dockerImageName) {
            super(dockerImageName);
        }

        public void beforeAll(ExtensionContext extensionContext) {
            this.doStart();
        }

        public void afterAll(ExtensionContext extensionContext) {
            this.close();
        }
    }

    protected class KafkaSyncDatabaseActionBuilder
    extends CdcActionITCaseBase.SyncDatabaseActionBuilder<KafkaSyncDatabaseAction> {
        public KafkaSyncDatabaseActionBuilder(Map<String, String> kafkaConfig) {
            super(KafkaSyncDatabaseAction.class, kafkaConfig);
        }
    }

    protected class KafkaSyncTableActionBuilder
    extends CdcActionITCaseBase.SyncTableActionBuilder<KafkaSyncTableAction> {
        public KafkaSyncTableActionBuilder(Map<String, String> kafkaConfig) {
            super(KafkaSyncTableAction.class, kafkaConfig);
        }
    }
}

