/*
 * Decompiled with CFR 0.152.
 */
package org.birchframework.framework.kafka;

import com.google.common.base.Throwables;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.PreDestroy;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.StopWatch;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.birchframework.configuration.BirchProperties;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.PropertyMapper;

public class KafkaAdminUtils {
    private static final @UnknownKeyFor @NonNull @Initialized Logger log = LoggerFactory.getLogger(KafkaAdminUtils.class);
    private static final @UnknownKeyFor @NonNull @Initialized PropertyMapper mapper = PropertyMapper.get();
    private final @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> kafkaConfigs;
    private final @UnknownKeyFor @NonNull @Initialized KafkaConsumer<@UnknownKeyFor @NonNull @Initialized Object, @UnknownKeyFor @NonNull @Initialized Object> kafkaConsumer;
    private final @UnknownKeyFor @NonNull @Initialized AdminClient adminClient;

    KafkaAdminUtils(@UnknownKeyFor @NonNull @Initialized BirchProperties theProperties) {
        BirchProperties.Kafka.KafkaAdmin aKafkaProperties = theProperties.getKafka().getAdmin();
        HashMap<String, Object> aKafkaConfigs = new HashMap<String, Object>();
        aKafkaConfigs.put("bootstrap.servers", String.join((CharSequence)",", aKafkaProperties.getBootstrapServers()));
        mapper.from((Object)aKafkaProperties.getSslProtocol()).when(StringUtils::isNotBlank).to(v -> aKafkaConfigs.put("ssl.protocol", v));
        mapper.from((Object)aKafkaProperties.getSecurityProtocol()).when(StringUtils::isNotBlank).to(v -> aKafkaConfigs.put("security.protocol", v));
        mapper.from((Object)aKafkaProperties.getKeySerializer()).whenNonNull().to(v -> aKafkaConfigs.put("key.serializer", v));
        mapper.from((Object)aKafkaProperties.getValueSerializer()).whenNonNull().to(v -> aKafkaConfigs.put("value.serializer", v));
        aKafkaConfigs.put("key.deserializer", ObjectUtils.defaultIfNull((Object)aKafkaProperties.getKeyDeserializer(), StringDeserializer.class));
        aKafkaConfigs.put("value.deserializer", ObjectUtils.defaultIfNull((Object)aKafkaProperties.getValueDeserializer(), StringDeserializer.class));
        mapper.from((Object)aKafkaProperties.getSasl().getJaasConfig()).whenNonNull().to(v -> aKafkaConfigs.put("sasl.jaas.config", v));
        mapper.from((Object)aKafkaProperties.getSasl().getMechanism()).whenNonNull().to(v -> aKafkaConfigs.put("sasl.mechanism", v));
        this.kafkaConfigs = Collections.unmodifiableMap(aKafkaConfigs);
        this.kafkaConsumer = new KafkaConsumer(aKafkaConfigs);
        this.adminClient = AdminClient.create(aKafkaConfigs);
    }

    @PreDestroy
    void preDestroy() {
        this.kafkaConsumer.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Long> topicLags() {
        StopWatch aStopWatch = log.isDebugEnabled() ? StopWatch.createStarted() : null;
        try {
            Map aTopicEndOffsets;
            List aConsumerGroupIDs = (List)this.adminClient.listConsumerGroups().valid().thenApply(result -> result.stream().map(ConsumerGroupListing::groupId).collect(Collectors.toList())).get();
            HashMap aTopicAndOffsetMap = new HashMap();
            aConsumerGroupIDs.stream().map(id -> this.adminClient.listConsumerGroupOffsets(id).partitionsToOffsetAndMetadata()).forEach(future -> {
                try {
                    aTopicAndOffsetMap.putAll((Map)future.get());
                }
                catch (InterruptedException e) {
                    log.warn("Unable to retrieve topic offsets for a consumer group; error message: {}", (Object)Throwables.getRootCause((Throwable)e).getMessage());
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
                catch (ExecutionException e) {
                    log.warn("Unable to retrieve topic offsets for a consumer group; error message: {}", (Object)Throwables.getRootCause((Throwable)e).getMessage());
                }
            });
            KafkaConsumer<Object, Object> kafkaConsumer = this.kafkaConsumer;
            synchronized (kafkaConsumer) {
                aTopicEndOffsets = this.kafkaConsumer.endOffsets(aTopicAndOffsetMap.keySet());
            }
            Map<TopicPartition, Long> aTopicPartitionLagMap = aTopicAndOffsetMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> {
                Long anEndOffset = (Long)aTopicEndOffsets.get(entry.getKey());
                long aLag = anEndOffset - ((OffsetAndMetadata)entry.getValue()).offset();
                return aLag < 0L ? 0L : aLag;
            }));
            HashMap<String, Long> aTopicLags = new HashMap<String, Long>();
            aTopicPartitionLagMap.forEach((key, value) -> aTopicLags.compute(key.topic(), (k, v) -> v == null ? value : v + value));
            HashMap<String, Long> hashMap = aTopicLags;
            return hashMap;
        }
        catch (InterruptedException e) {
            log.warn("Unable to retrieve consumer groups; error message: {}", (Object)Throwables.getRootCause((Throwable)e).getMessage());
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            log.warn("Unable to retrieve consumer groups; error message: {}", (Object)Throwables.getRootCause((Throwable)e).getMessage());
            Map<String, Long> map = Collections.emptyMap();
            return map;
        }
        finally {
            if (aStopWatch != null) {
                log.debug("Completed lag calculation in {} milliseconds", (Object)aStopWatch.getTime());
            }
        }
    }

    public @UnknownKeyFor @NonNull @Initialized Map<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Object> getKafkaConfigs() {
        return this.kafkaConfigs;
    }
}

