/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.dynamic.source.enumerator;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.apache.flink.connector.kafka.dynamic.source.DynamicKafkaSourceOptions;
import org.apache.flink.connector.kafka.dynamic.source.GetMetadataUpdateEvent;
import org.apache.flink.connector.kafka.dynamic.source.MetadataUpdateEvent;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumState;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.StoppableKafkaEnumContextProxy;
import org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSubscriber;
import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumerator;
import org.apache.flink.connector.kafka.source.enumerator.TopicPartitionAndAssignmentStatus;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.KafkaException;
import org.apache.flink.kafka.shaded.org.apache.kafka.common.TopicPartition;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DynamicKafkaSourceEnumerator
implements SplitEnumerator<DynamicKafkaSourceSplit, DynamicKafkaSourceEnumState> {
    private static final Logger logger = LoggerFactory.getLogger(DynamicKafkaSourceEnumerator.class);
    private final Map<String, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>> clusterEnumeratorMap;
    private final Map<String, StoppableKafkaEnumContextProxy> clusterEnumContextMap;
    private final KafkaStreamSubscriber kafkaStreamSubscriber;
    private final SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext;
    private final KafkaMetadataService kafkaMetadataService;
    private final Properties properties;
    private final OffsetsInitializer startingOffsetsInitializer;
    private final OffsetsInitializer stoppingOffsetInitializer;
    private final Boundedness boundedness;
    private final StoppableKafkaEnumContextProxy.StoppableKafkaEnumContextProxyFactory stoppableKafkaEnumContextProxyFactory;
    private final long kafkaMetadataServiceDiscoveryIntervalMs;
    private final int kafkaMetadataServiceDiscoveryFailureThreshold;
    private int kafkaMetadataServiceDiscoveryFailureCount;
    private Map<String, Set<String>> latestClusterTopicsMap;
    private Set<KafkaStream> latestKafkaStreams;
    private boolean firstDiscoveryComplete;

    public DynamicKafkaSourceEnumerator(KafkaStreamSubscriber kafkaStreamSubscriber, KafkaMetadataService kafkaMetadataService, SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext, OffsetsInitializer startingOffsetsInitializer, OffsetsInitializer stoppingOffsetInitializer, Properties properties, Boundedness boundedness, DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState) {
        this(kafkaStreamSubscriber, kafkaMetadataService, enumContext, startingOffsetsInitializer, stoppingOffsetInitializer, properties, boundedness, dynamicKafkaSourceEnumState, StoppableKafkaEnumContextProxy.StoppableKafkaEnumContextProxyFactory.getDefaultFactory());
    }

    @VisibleForTesting
    DynamicKafkaSourceEnumerator(KafkaStreamSubscriber kafkaStreamSubscriber, KafkaMetadataService kafkaMetadataService, SplitEnumeratorContext<DynamicKafkaSourceSplit> enumContext, OffsetsInitializer startingOffsetsInitializer, OffsetsInitializer stoppingOffsetInitializer, Properties properties, Boundedness boundedness, DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState, StoppableKafkaEnumContextProxy.StoppableKafkaEnumContextProxyFactory stoppableKafkaEnumContextProxyFactory) {
        this.kafkaStreamSubscriber = kafkaStreamSubscriber;
        this.boundedness = boundedness;
        this.startingOffsetsInitializer = startingOffsetsInitializer;
        this.stoppingOffsetInitializer = stoppingOffsetInitializer;
        this.properties = properties;
        this.enumContext = enumContext;
        this.kafkaMetadataServiceDiscoveryIntervalMs = DynamicKafkaSourceOptions.getOption(properties, DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS, Long::parseLong);
        this.kafkaMetadataServiceDiscoveryFailureThreshold = DynamicKafkaSourceOptions.getOption(properties, DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_FAILURE_THRESHOLD, Integer::parseInt);
        this.kafkaMetadataServiceDiscoveryFailureCount = 0;
        this.firstDiscoveryComplete = false;
        this.kafkaMetadataService = kafkaMetadataService;
        this.stoppableKafkaEnumContextProxyFactory = stoppableKafkaEnumContextProxyFactory;
        this.clusterEnumeratorMap = new HashMap<String, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>>();
        this.clusterEnumContextMap = new HashMap<String, StoppableKafkaEnumContextProxy>();
        this.latestKafkaStreams = dynamicKafkaSourceEnumState.getKafkaStreams();
        HashMap<String, Properties> clusterProperties = new HashMap<String, Properties>();
        for (KafkaStream kafkaStream : this.latestKafkaStreams) {
            for (Map.Entry<String, ClusterMetadata> entry : kafkaStream.getClusterMetadataMap().entrySet()) {
                clusterProperties.put(entry.getKey(), entry.getValue().getProperties());
            }
        }
        this.latestClusterTopicsMap = new HashMap<String, Set<String>>();
        for (Map.Entry entry : dynamicKafkaSourceEnumState.getClusterEnumeratorStates().entrySet()) {
            this.latestClusterTopicsMap.put((String)entry.getKey(), ((KafkaSourceEnumState)entry.getValue()).assignedPartitions().stream().map(TopicPartition::topic).collect(Collectors.toSet()));
            this.createEnumeratorWithAssignedTopicPartitions((String)entry.getKey(), this.latestClusterTopicsMap.get(entry.getKey()), (KafkaSourceEnumState)entry.getValue(), (Properties)clusterProperties.get(entry.getKey()));
        }
    }

    public void start() {
        if (!this.clusterEnumeratorMap.isEmpty()) {
            this.startAllEnumerators();
        }
        if (this.kafkaMetadataServiceDiscoveryIntervalMs <= 0L) {
            this.enumContext.callAsync(() -> this.kafkaStreamSubscriber.getSubscribedStreams(this.kafkaMetadataService), this::onHandleSubscribedStreamsFetch);
        } else {
            this.enumContext.callAsync(() -> this.kafkaStreamSubscriber.getSubscribedStreams(this.kafkaMetadataService), this::onHandleSubscribedStreamsFetch, 0L, this.kafkaMetadataServiceDiscoveryIntervalMs);
        }
    }

    private void handleNoMoreSplits() {
        if (Boundedness.BOUNDED.equals((Object)this.boundedness)) {
            boolean allEnumeratorsHaveSignalledNoMoreSplits = true;
            for (StoppableKafkaEnumContextProxy context : this.clusterEnumContextMap.values()) {
                allEnumeratorsHaveSignalledNoMoreSplits = allEnumeratorsHaveSignalledNoMoreSplits && context.isNoMoreSplits();
            }
            if (this.firstDiscoveryComplete && allEnumeratorsHaveSignalledNoMoreSplits) {
                logger.info("Signal no more splits to all readers: {}", this.enumContext.registeredReaders().keySet());
                this.enumContext.registeredReaders().keySet().forEach(arg_0 -> this.enumContext.signalNoMoreSplits(arg_0));
            } else {
                logger.info("Not ready to notify no more splits to readers.");
            }
        }
    }

    private void onHandleSubscribedStreamsFetch(Set<KafkaStream> fetchedKafkaStreams, Throwable t) {
        DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState;
        this.firstDiscoveryComplete = true;
        Set<KafkaStream> handledFetchKafkaStreams = this.handleFetchSubscribedStreamsError(fetchedKafkaStreams, t);
        HashMap<String, Set<String>> newClustersTopicsMap = new HashMap<String, Set<String>>();
        HashMap<String, Properties> clusterProperties = new HashMap<String, Properties>();
        for (KafkaStream kafkaStream : handledFetchKafkaStreams) {
            for (Map.Entry<String, ClusterMetadata> entry : kafkaStream.getClusterMetadataMap().entrySet()) {
                String kafkaClusterId = entry.getKey();
                ClusterMetadata clusterMetadata = entry.getValue();
                newClustersTopicsMap.computeIfAbsent(kafkaClusterId, unused -> new HashSet()).addAll(clusterMetadata.getTopics());
                clusterProperties.put(kafkaClusterId, clusterMetadata.getProperties());
            }
        }
        if (this.latestClusterTopicsMap.equals(newClustersTopicsMap)) {
            return;
        }
        if (logger.isInfoEnabled()) {
            MapDifference metadataDifference = Maps.difference(this.latestClusterTopicsMap, newClustersTopicsMap);
            logger.info("Common cluster topics after metadata refresh: {}", (Object)metadataDifference.entriesInCommon());
            logger.info("Removed cluster topics after metadata refresh: {}", (Object)metadataDifference.entriesOnlyOnLeft());
            logger.info("Additional cluster topics after metadata refresh: {}", (Object)metadataDifference.entriesOnlyOnRight());
        }
        try {
            dynamicKafkaSourceEnumState = this.snapshotState(-1L);
        }
        catch (Exception e) {
            throw new RuntimeException("unable to snapshot state in metadata change", e);
        }
        logger.info("Closing enumerators due to metadata change");
        this.closeAllEnumeratorsAndContexts();
        this.latestClusterTopicsMap = newClustersTopicsMap;
        this.latestKafkaStreams = handledFetchKafkaStreams;
        this.sendMetadataUpdateEventToAvailableReaders();
        for (Map.Entry<String, Set<String>> activeClusterTopics : this.latestClusterTopicsMap.entrySet()) {
            KafkaSourceEnumState newKafkaSourceEnumState;
            KafkaSourceEnumState kafkaSourceEnumState = dynamicKafkaSourceEnumState.getClusterEnumeratorStates().get(activeClusterTopics.getKey());
            if (kafkaSourceEnumState != null) {
                Set<String> activeTopics = activeClusterTopics.getValue();
                Set<TopicPartitionAndAssignmentStatus> partitions = kafkaSourceEnumState.partitions().stream().filter(tp -> activeTopics.contains(tp.topicPartition().topic())).collect(Collectors.toSet());
                newKafkaSourceEnumState = new KafkaSourceEnumState(partitions, kafkaSourceEnumState.initialDiscoveryFinished());
            } else {
                newKafkaSourceEnumState = new KafkaSourceEnumState(Collections.emptySet(), false);
            }
            this.createEnumeratorWithAssignedTopicPartitions(activeClusterTopics.getKey(), activeClusterTopics.getValue(), newKafkaSourceEnumState, (Properties)clusterProperties.get(activeClusterTopics.getKey()));
        }
        this.startAllEnumerators();
    }

    private Set<KafkaStream> handleFetchSubscribedStreamsError(Set<KafkaStream> fetchedKafkaStreams, @Nullable Throwable t) {
        if (t != null) {
            if (!this.latestKafkaStreams.isEmpty() && ++this.kafkaMetadataServiceDiscoveryFailureCount <= this.kafkaMetadataServiceDiscoveryFailureThreshold) {
                logger.warn("Swallowing metadata service error", t);
                return this.latestKafkaStreams;
            }
            throw new RuntimeException("Fetching subscribed Kafka streams failed and no metadata to fallback", t);
        }
        this.kafkaMetadataServiceDiscoveryFailureCount = 0;
        return fetchedKafkaStreams;
    }

    private void sendMetadataUpdateEventToAvailableReaders() {
        Iterator iterator = this.enumContext.registeredReaders().keySet().iterator();
        while (iterator.hasNext()) {
            int readerId = (Integer)iterator.next();
            MetadataUpdateEvent metadataUpdateEvent = new MetadataUpdateEvent(this.latestKafkaStreams);
            logger.info("sending metadata update to reader {}: {}", (Object)readerId, (Object)metadataUpdateEvent);
            this.enumContext.sendEventToSourceReader(readerId, (SourceEvent)metadataUpdateEvent);
        }
    }

    private KafkaSourceEnumerator createEnumeratorWithAssignedTopicPartitions(String kafkaClusterId, Set<String> topics, KafkaSourceEnumState kafkaSourceEnumState, Properties fetchedProperties) {
        Runnable signalNoMoreSplitsCallback = Boundedness.BOUNDED.equals((Object)this.boundedness) ? this::handleNoMoreSplits : null;
        StoppableKafkaEnumContextProxy context = this.stoppableKafkaEnumContextProxyFactory.create(this.enumContext, kafkaClusterId, this.kafkaMetadataService, signalNoMoreSplitsCallback);
        Properties consumerProps = new Properties();
        KafkaPropertiesUtil.copyProperties(fetchedProperties, consumerProps);
        KafkaPropertiesUtil.copyProperties(this.properties, consumerProps);
        KafkaPropertiesUtil.setClientIdPrefix(consumerProps, kafkaClusterId);
        KafkaSourceEnumerator enumerator = new KafkaSourceEnumerator(KafkaSubscriber.getTopicListSubscriber(new ArrayList<String>(topics)), this.startingOffsetsInitializer, this.stoppingOffsetInitializer, consumerProps, context, this.boundedness, kafkaSourceEnumState);
        this.clusterEnumContextMap.put(kafkaClusterId, context);
        this.clusterEnumeratorMap.put(kafkaClusterId, enumerator);
        return enumerator;
    }

    private void startAllEnumerators() {
        for (String kafkaClusterId : this.latestClusterTopicsMap.keySet()) {
            try {
                this.clusterEnumeratorMap.get(kafkaClusterId).start();
            }
            catch (KafkaException e) {
                if (this.kafkaMetadataService.isClusterActive(kafkaClusterId)) {
                    throw new RuntimeException(String.format("Failed to create enumerator for %s", kafkaClusterId), e);
                }
                logger.info("Found inactive cluster {} while initializing, removing enumerator", (Object)kafkaClusterId, (Object)e);
                try {
                    this.clusterEnumContextMap.remove(kafkaClusterId).close();
                    this.clusterEnumeratorMap.remove(kafkaClusterId).close();
                }
                catch (Exception ex) {
                    throw new RuntimeException("Failed to close enum context for " + kafkaClusterId, ex);
                }
            }
        }
    }

    private void closeAllEnumeratorsAndContexts() {
        this.clusterEnumeratorMap.forEach((cluster, subEnumerator) -> {
            try {
                this.clusterEnumContextMap.get(cluster).close();
                subEnumerator.close();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        this.clusterEnumContextMap.clear();
        this.clusterEnumeratorMap.clear();
    }

    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        throw new UnsupportedOperationException("Kafka enumerators only assign splits to readers.");
    }

    public void addSplitsBack(List<DynamicKafkaSourceSplit> splits, int subtaskId) {
        logger.debug("Adding splits back for {}", (Object)subtaskId);
        ArrayListMultimap kafkaPartitionSplits = ArrayListMultimap.create();
        for (DynamicKafkaSourceSplit split : splits) {
            kafkaPartitionSplits.put((Object)split.getKafkaClusterId(), (Object)split.getKafkaPartitionSplit());
        }
        for (String kafkaClusterId : kafkaPartitionSplits.keySet()) {
            if (this.clusterEnumeratorMap.containsKey(kafkaClusterId)) {
                this.clusterEnumeratorMap.get(kafkaClusterId).addSplitsBack(kafkaPartitionSplits.get((Object)kafkaClusterId), subtaskId);
                continue;
            }
            logger.warn("Split refers to inactive cluster {} with current clusters being {}", (Object)kafkaClusterId, this.clusterEnumeratorMap.keySet());
        }
        this.handleNoMoreSplits();
    }

    public void addReader(int subtaskId) {
        logger.debug("Adding reader {}", (Object)subtaskId);
        this.clusterEnumeratorMap.forEach((cluster, subEnumerator) -> subEnumerator.addReader(subtaskId));
        this.handleNoMoreSplits();
    }

    public DynamicKafkaSourceEnumState snapshotState(long checkpointId) throws Exception {
        HashMap<String, KafkaSourceEnumState> subEnumeratorStateByCluster = new HashMap<String, KafkaSourceEnumState>();
        for (Map.Entry<String, SplitEnumerator<KafkaPartitionSplit, KafkaSourceEnumState>> clusterEnumerator : this.clusterEnumeratorMap.entrySet()) {
            subEnumeratorStateByCluster.put(clusterEnumerator.getKey(), (KafkaSourceEnumState)clusterEnumerator.getValue().snapshotState(checkpointId));
        }
        return new DynamicKafkaSourceEnumState(this.latestKafkaStreams, subEnumeratorStateByCluster);
    }

    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        Preconditions.checkArgument((boolean)(sourceEvent instanceof GetMetadataUpdateEvent), (Object)("Received invalid source event: " + sourceEvent));
        if (this.enumContext.registeredReaders().containsKey(subtaskId)) {
            MetadataUpdateEvent metadataUpdateEvent = new MetadataUpdateEvent(this.latestKafkaStreams);
            logger.info("sending metadata update to reader {}: {}", (Object)subtaskId, (Object)metadataUpdateEvent);
            this.enumContext.sendEventToSourceReader(subtaskId, (SourceEvent)metadataUpdateEvent);
        } else {
            logger.warn("Got get metadata update but subtask was unavailable");
        }
    }

    public void close() throws IOException {
        try {
            for (StoppableKafkaEnumContextProxy stoppableKafkaEnumContextProxy : this.clusterEnumContextMap.values()) {
                stoppableKafkaEnumContextProxy.close();
            }
            for (Map.Entry entry : this.clusterEnumeratorMap.entrySet()) {
                ((SplitEnumerator)entry.getValue()).close();
            }
            this.kafkaMetadataService.close();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

