/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.testutils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

public class KafkaUtil {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaUtil.class);
    private static final Duration CONSUMER_POLL_DURATION = Duration.ofSeconds(1L);

    private KafkaUtil() {
    }

    public static KafkaContainer createKafkaContainer(Class<?> testCase) {
        return KafkaUtil.createKafkaContainer(KafkaUtil.getContainerName("kafka", testCase));
    }

    public static KafkaContainer createKafkaContainer(String containerName) {
        Logger logger = KafkaUtil.getLogger(containerName);
        String logLevel = KafkaUtil.inferLogLevel(logger);
        Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(logger, true);
        return (KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)new KafkaContainer(DockerImageName.parse((String)"confluentinc/cp-kafka:7.4.4")).withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")).withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")).withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")).withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")).withEnv("KAFKA_LOG4J_ROOT_LOGLEVEL", logLevel)).withEnv("KAFKA_LOG4J_LOGGERS", "state.change.logger=" + logLevel)).withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")).withEnv("KAFKA_TRANSACTION_MAX_TIMEOUT_MS", String.valueOf(Duration.ofHours(2L).toMillis()))).withEnv("KAFKA_LOG4J_TOOLS_ROOT_LOGLEVEL", logLevel)).withLogConsumer((Consumer)logConsumer);
    }

    private static String inferLogLevel(Logger logger) {
        String logLevel = logger.isTraceEnabled() ? "TRACE" : (logger.isDebugEnabled() ? "DEBUG" : (logger.isInfoEnabled() ? "INFO" : (logger.isWarnEnabled() ? "WARN" : (logger.isErrorEnabled() ? "ERROR" : "OFF"))));
        return logLevel;
    }

    public static Logger getLogger(String containerName) {
        return LoggerFactory.getLogger((String)("container." + containerName));
    }

    public static Logger getLogger(String type, Class<?> testClass) {
        return KafkaUtil.getLogger(KafkaUtil.getContainerName(type, testClass));
    }

    private static String getContainerName(String type, Class<?> testClass) {
        return type + "." + testClass.getSimpleName();
    }

    public static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(String topic, Properties properties, boolean committed) throws KafkaException {
        Properties consumerConfig = new Properties();
        consumerConfig.putAll((Map<?, ?>)properties);
        consumerConfig.put("isolation.level", committed ? "read_committed" : "read_uncommitted");
        return KafkaUtil.drainAllRecordsFromTopic(topic, consumerConfig);
    }

    public static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(String topic, Properties properties) throws KafkaException {
        Properties consumerConfig = new Properties();
        consumerConfig.putAll((Map<?, ?>)properties);
        consumerConfig.put("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName());
        try (KafkaConsumer consumer = new KafkaConsumer(consumerConfig);){
            Set<TopicPartition> topicPartitions = KafkaUtil.getAllPartitions((KafkaConsumer<byte[], byte[]>)consumer, topic);
            Map endOffsets = consumer.endOffsets(topicPartitions);
            consumer.assign(topicPartitions);
            consumer.seekToBeginning(topicPartitions);
            ArrayList<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>();
            while (!topicPartitions.isEmpty()) {
                ConsumerRecords records = consumer.poll(CONSUMER_POLL_DURATION);
                LOG.debug("Fetched {} records from topic {}.", (Object)records.count(), (Object)topic);
                ArrayList<TopicPartition> finishedPartitions = new ArrayList<TopicPartition>();
                for (TopicPartition topicPartition : topicPartitions) {
                    long position = consumer.position(topicPartition);
                    long endOffset = (Long)endOffsets.get(topicPartition);
                    LOG.debug("Endoffset {} and current position {} for partition {}", new Object[]{endOffset, position, topicPartition.partition()});
                    if (endOffset - position > 0L) continue;
                    finishedPartitions.add(topicPartition);
                }
                if (topicPartitions.removeAll(finishedPartitions)) {
                    consumer.assign(topicPartitions);
                }
                for (ConsumerRecord r : records) {
                    consumerRecords.add((ConsumerRecord<byte[], byte[]>)r);
                }
            }
            ArrayList<ConsumerRecord<byte[], byte[]>> arrayList = consumerRecords;
            return arrayList;
        }
    }

    private static Set<TopicPartition> getAllPartitions(KafkaConsumer<byte[], byte[]> consumer, String topic) {
        return consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toSet());
    }

    public static void checkProducerLeak() {
        List leaks = null;
        for (int tries = 0; tries < 10; ++tries) {
            leaks = Thread.getAllStackTraces().entrySet().stream().filter(KafkaUtil::findAliveKafkaThread).collect(Collectors.toList());
            if (leaks.isEmpty()) {
                return;
            }
            try {
                Thread.sleep(1000L);
                continue;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
        for (Map.Entry leak : leaks) {
            ((Thread)leak.getKey()).stop();
        }
        Assertions.fail((String)("Detected producer leaks:\n" + leaks.stream().map(KafkaUtil::format).collect(Collectors.joining("\n\n"))));
    }

    private static String format(Map.Entry<Thread, StackTraceElement[]> leak) {
        String stackTrace = Arrays.stream(leak.getValue()).map(StackTraceElement::toString).collect(Collectors.joining("\n"));
        return leak.getKey().getName() + ":\n" + stackTrace;
    }

    private static boolean findAliveKafkaThread(Map.Entry<Thread, StackTraceElement[]> threadStackTrace) {
        return threadStackTrace.getKey().getState() != Thread.State.TERMINATED && threadStackTrace.getKey().getName().contains("kafka-producer-network-thread");
    }
}

