/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.testutils;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;

public class KafkaUtil {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaUtil.class);
    private static final Duration CONSUMER_POLL_DURATION = Duration.ofSeconds(1L);

    private KafkaUtil() {
    }

    public static KafkaContainer createKafkaContainer(String dockerImageVersion, Logger logger) {
        return KafkaUtil.createKafkaContainer(dockerImageVersion, logger, null);
    }

    public static KafkaContainer createKafkaContainer(String dockerImageVersion, Logger logger, String loggerPrefix) {
        String logLevel = logger.isTraceEnabled() ? "TRACE" : (logger.isDebugEnabled() ? "DEBUG" : (logger.isInfoEnabled() ? "INFO" : (logger.isWarnEnabled() ? "WARN" : (logger.isErrorEnabled() ? "ERROR" : "OFF"))));
        Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(logger);
        if (!StringUtils.isNullOrWhitespaceOnly((String)loggerPrefix)) {
            logConsumer.withPrefix(loggerPrefix);
        }
        return (KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)((KafkaContainer)new KafkaContainer(DockerImageName.parse((String)dockerImageVersion)).withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")).withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")).withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")).withEnv("KAFKA_LOG4J_ROOT_LOGLEVEL", logLevel)).withEnv("KAFKA_LOG4J_LOGGERS", "state.change.logger=" + logLevel)).withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")).withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")).withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")).withEnv("KAFKA_TRANSACTION_MAX_TIMEOUT_MS", String.valueOf(Duration.ofHours(2L).toMillis()))).withEnv("KAFKA_LOG4J_TOOLS_ROOT_LOGLEVEL", logLevel)).withLogConsumer((Consumer)logConsumer);
    }

    public static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(String topic, Properties properties, boolean committed) throws KafkaException {
        Properties consumerConfig = new Properties();
        consumerConfig.putAll((Map<?, ?>)properties);
        consumerConfig.put("isolation.level", committed ? "read_committed" : "read_uncommitted");
        return KafkaUtil.drainAllRecordsFromTopic(topic, consumerConfig);
    }

    public static List<ConsumerRecord<byte[], byte[]>> drainAllRecordsFromTopic(String topic, Properties properties) throws KafkaException {
        Properties consumerConfig = new Properties();
        consumerConfig.putAll((Map<?, ?>)properties);
        consumerConfig.put("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerConfig.put("value.deserializer", ByteArrayDeserializer.class.getName());
        try (KafkaConsumer consumer = new KafkaConsumer(consumerConfig);){
            Set<TopicPartition> topicPartitions = KafkaUtil.getAllPartitions((KafkaConsumer<byte[], byte[]>)consumer, topic);
            Map endOffsets = consumer.endOffsets(topicPartitions);
            consumer.assign(topicPartitions);
            consumer.seekToBeginning(topicPartitions);
            ArrayList<ConsumerRecord<byte[], byte[]>> consumerRecords = new ArrayList<ConsumerRecord<byte[], byte[]>>();
            while (!topicPartitions.isEmpty()) {
                ConsumerRecords records = consumer.poll(CONSUMER_POLL_DURATION);
                LOG.debug("Fetched {} records from topic {}.", (Object)records.count(), (Object)topic);
                ArrayList<TopicPartition> finishedPartitions = new ArrayList<TopicPartition>();
                for (TopicPartition topicPartition : topicPartitions) {
                    long position = consumer.position(topicPartition);
                    long endOffset = (Long)endOffsets.get(topicPartition);
                    LOG.debug("Endoffset {} and current position {} for partition {}", new Object[]{endOffset, position, topicPartition.partition()});
                    if (endOffset - position > 0L) continue;
                    finishedPartitions.add(topicPartition);
                }
                if (topicPartitions.removeAll(finishedPartitions)) {
                    consumer.assign(topicPartitions);
                }
                for (ConsumerRecord r : records) {
                    consumerRecords.add((ConsumerRecord<byte[], byte[]>)r);
                }
            }
            ArrayList<ConsumerRecord<byte[], byte[]>> arrayList = consumerRecords;
            return arrayList;
        }
    }

    private static Set<TopicPartition> getAllPartitions(KafkaConsumer<byte[], byte[]> consumer, String topic) {
        return consumer.partitionsFor(topic).stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toSet());
    }
}

