/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.dynamic.source.reader;

import com.google.common.collect.ArrayListMultimap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Properties;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.apache.flink.connector.kafka.dynamic.source.GetMetadataUpdateEvent;
import org.apache.flink.connector.kafka.dynamic.source.MetadataUpdateEvent;
import org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroup;
import org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager;
import org.apache.flink.connector.kafka.dynamic.source.reader.KafkaPartitionSplitReaderWrapper;
import org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.reader.fetcher.KafkaSourceFetcherManager;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class DynamicKafkaSourceReader<T>
implements SourceReader<T, DynamicKafkaSourceSplit> {
    private static final Logger logger = LoggerFactory.getLogger(DynamicKafkaSourceReader.class);
    private final KafkaRecordDeserializationSchema<T> deserializationSchema;
    private final Properties properties;
    private final MetricGroup dynamicKafkaSourceMetricGroup;
    private final Gauge<Integer> kafkaClusterCount;
    private final SourceReaderContext readerContext;
    private final KafkaClusterMetricGroupManager kafkaClusterMetricGroupManager;
    private final NavigableMap<String, KafkaSourceReader<T>> clusterReaderMap;
    private final Map<String, Properties> clustersProperties;
    private final List<DynamicKafkaSourceSplit> pendingSplits;
    private MultipleFuturesAvailabilityHelper availabilityHelper;
    private boolean isActivelyConsumingSplits;
    private boolean isNoMoreSplits;
    private AtomicBoolean restartingReaders;

    public DynamicKafkaSourceReader(SourceReaderContext readerContext, KafkaRecordDeserializationSchema<T> deserializationSchema, Properties properties) {
        this.readerContext = readerContext;
        this.clusterReaderMap = new TreeMap<String, KafkaSourceReader<T>>();
        this.deserializationSchema = deserializationSchema;
        this.properties = properties;
        this.kafkaClusterCount = this.clusterReaderMap::size;
        this.dynamicKafkaSourceMetricGroup = readerContext.metricGroup().addGroup("DynamicKafkaSource");
        this.kafkaClusterMetricGroupManager = new KafkaClusterMetricGroupManager();
        this.pendingSplits = new ArrayList<DynamicKafkaSourceSplit>();
        this.availabilityHelper = new MultipleFuturesAvailabilityHelper(0);
        this.isNoMoreSplits = false;
        this.isActivelyConsumingSplits = false;
        this.restartingReaders = new AtomicBoolean();
        this.clustersProperties = new HashMap<String, Properties>();
    }

    public void start() {
        logger.trace("Starting reader for subtask index={}", (Object)this.readerContext.getIndexOfSubtask());
        this.readerContext.metricGroup().gauge("kafkaClusterCount", this.kafkaClusterCount);
        this.readerContext.sendSourceEventToCoordinator((SourceEvent)new GetMetadataUpdateEvent());
    }

    public InputStatus pollNext(ReaderOutput<T> readerOutput) throws Exception {
        if (this.clusterReaderMap.isEmpty()) {
            return this.logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
        }
        if (this.restartingReaders.get()) {
            logger.debug("Poll next invoked while restarting readers");
            return this.logAndReturnInputStatus(InputStatus.NOTHING_AVAILABLE);
        }
        boolean isMoreAvailable = false;
        boolean isNothingAvailable = false;
        for (Map.Entry clusterReader : this.clusterReaderMap.entrySet()) {
            InputStatus inputStatus = ((KafkaSourceReader)((Object)clusterReader.getValue())).pollNext(readerOutput);
            switch (inputStatus) {
                case MORE_AVAILABLE: {
                    isMoreAvailable = true;
                    break;
                }
                case NOTHING_AVAILABLE: {
                    isNothingAvailable = true;
                }
            }
        }
        return this.logAndReturnInputStatus(this.consolidateInputStatus(isMoreAvailable, isNothingAvailable));
    }

    private InputStatus consolidateInputStatus(boolean atLeastOneMoreAvailable, boolean atLeastOneNothingAvailable) {
        InputStatus inputStatus = atLeastOneMoreAvailable ? InputStatus.MORE_AVAILABLE : (atLeastOneNothingAvailable ? InputStatus.NOTHING_AVAILABLE : InputStatus.END_OF_INPUT);
        return inputStatus;
    }

    public void addSplits(List<DynamicKafkaSourceSplit> splits) {
        logger.info("Adding splits to reader {}: {}", (Object)this.readerContext.getIndexOfSubtask(), splits);
        if (!this.isActivelyConsumingSplits) {
            this.pendingSplits.addAll(splits);
            return;
        }
        ArrayListMultimap clusterSplitsMap = ArrayListMultimap.create();
        for (DynamicKafkaSourceSplit split : splits) {
            clusterSplitsMap.put((Object)split.getKafkaClusterId(), (Object)split);
        }
        Set kafkaClusterIds = clusterSplitsMap.keySet();
        boolean newCluster = false;
        for (String kafkaClusterId : kafkaClusterIds) {
            if (!this.clusterReaderMap.containsKey(kafkaClusterId)) {
                try {
                    KafkaSourceReader<T> kafkaSourceReader = this.createReader(kafkaClusterId);
                    this.clusterReaderMap.put(kafkaClusterId, kafkaSourceReader);
                    kafkaSourceReader.start();
                    newCluster = true;
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            KafkaSourceReader reader = (KafkaSourceReader)((Object)this.clusterReaderMap.get(kafkaClusterId));
            reader.addSplits(clusterSplitsMap.get((Object)kafkaClusterId));
        }
        if (newCluster) {
            this.completeAndResetAvailabilityHelper();
        }
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        Preconditions.checkArgument((boolean)(sourceEvent instanceof MetadataUpdateEvent), (Object)("Received invalid source event: " + sourceEvent));
        logger.info("Received source event {}: subtask={}", (Object)sourceEvent, (Object)this.readerContext.getIndexOfSubtask());
        Set<KafkaStream> newKafkaStreams = ((MetadataUpdateEvent)sourceEvent).getKafkaStreams();
        HashMap<String, Set> newClustersAndTopics = new HashMap<String, Set>();
        HashMap<String, Properties> newClustersProperties = new HashMap<String, Properties>();
        for (KafkaStream kafkaStream : newKafkaStreams) {
            for (Map.Entry<String, ClusterMetadata> entry : kafkaStream.getClusterMetadataMap().entrySet()) {
                newClustersAndTopics.computeIfAbsent(entry.getKey(), unused -> new HashSet()).addAll(entry.getValue().getTopics());
                newClustersProperties.put(entry.getKey(), entry.getValue().getProperties());
            }
        }
        List<DynamicKafkaSourceSplit> currentSplitState = this.snapshotStateFromAllReaders(-1L);
        logger.info("Snapshotting split state for reader {}: {}", (Object)this.readerContext.getIndexOfSubtask(), currentSplitState);
        HashMap<String, Set> currentMetadataFromState = new HashMap<String, Set>();
        HashMap<String, List> filteredNewClusterSplitStateMap = new HashMap<String, List>();
        for (DynamicKafkaSourceSplit split : currentSplitState) {
            currentMetadataFromState.computeIfAbsent(split.getKafkaClusterId(), ignore -> new HashSet()).add(split.getKafkaPartitionSplit().getTopic());
            if (newClustersAndTopics.containsKey(split.getKafkaClusterId()) && ((Set)newClustersAndTopics.get(split.getKafkaClusterId())).contains(split.getKafkaPartitionSplit().getTopic())) {
                filteredNewClusterSplitStateMap.computeIfAbsent(split.getKafkaClusterId(), ignore -> new ArrayList()).add(split);
                continue;
            }
            logger.info("Skipping outdated split due to metadata changes: {}", (Object)split);
        }
        if (!newClustersAndTopics.equals(currentMetadataFromState)) {
            this.restartingReaders.set(true);
            this.closeAllReadersAndClearState();
            this.clustersProperties.putAll(newClustersProperties);
            for (String kafkaClusterId : newClustersAndTopics.keySet()) {
                try {
                    KafkaSourceReader<T> kafkaSourceReader = this.createReader(kafkaClusterId);
                    this.clusterReaderMap.put(kafkaClusterId, kafkaSourceReader);
                    if (filteredNewClusterSplitStateMap.containsKey(kafkaClusterId)) {
                        kafkaSourceReader.addSplits((List)filteredNewClusterSplitStateMap.get(kafkaClusterId));
                    }
                    kafkaSourceReader.start();
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
            this.completeAndResetAvailabilityHelper();
        } else {
            this.clustersProperties.clear();
            this.clustersProperties.putAll(newClustersProperties);
        }
        if (!this.isActivelyConsumingSplits) {
            this.isActivelyConsumingSplits = true;
        }
        if (!this.pendingSplits.isEmpty()) {
            List<DynamicKafkaSourceSplit> list = this.pendingSplits.stream().filter(pendingSplit -> {
                boolean splitValid = DynamicKafkaSourceReader.isSplitForActiveClusters(pendingSplit, newClustersAndTopics);
                if (!splitValid) {
                    logger.info("Removing invalid split for reader: {}", pendingSplit);
                }
                return splitValid;
            }).collect(Collectors.toList());
            this.addSplits(list);
            this.pendingSplits.clear();
            if (this.isNoMoreSplits) {
                this.notifyNoMoreSplits();
            }
        }
    }

    private static boolean isSplitForActiveClusters(DynamicKafkaSourceSplit split, Map<String, Set<String>> metadata) {
        return metadata.containsKey(split.getKafkaClusterId()) && metadata.get(split.getKafkaClusterId()).contains(split.getKafkaPartitionSplit().getTopic());
    }

    public List<DynamicKafkaSourceSplit> snapshotState(long checkpointId) {
        List<DynamicKafkaSourceSplit> splits = this.snapshotStateFromAllReaders(checkpointId);
        splits.addAll(this.pendingSplits);
        return splits;
    }

    private List<DynamicKafkaSourceSplit> snapshotStateFromAllReaders(long checkpointId) {
        ArrayList<DynamicKafkaSourceSplit> splits = new ArrayList<DynamicKafkaSourceSplit>();
        for (Map.Entry clusterReader : this.clusterReaderMap.entrySet()) {
            ((KafkaSourceReader)((Object)clusterReader.getValue())).snapshotState(checkpointId).forEach(kafkaPartitionSplit -> splits.add(new DynamicKafkaSourceSplit((String)clusterReader.getKey(), (KafkaPartitionSplit)kafkaPartitionSplit)));
        }
        return splits;
    }

    public CompletableFuture<Void> isAvailable() {
        this.availabilityHelper.resetToUnAvailable();
        this.syncAvailabilityHelperWithReaders();
        return this.availabilityHelper.getAvailableFuture();
    }

    public void notifyNoMoreSplits() {
        logger.info("notify no more splits for reader {}", (Object)this.readerContext.getIndexOfSubtask());
        if (this.pendingSplits.isEmpty()) {
            this.clusterReaderMap.values().forEach(SourceReaderBase::notifyNoMoreSplits);
        }
        this.isNoMoreSplits = true;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        logger.debug("Notify checkpoint complete for {}", this.clusterReaderMap.keySet());
        for (KafkaSourceReader subReader : this.clusterReaderMap.values()) {
            subReader.notifyCheckpointComplete(checkpointId);
        }
    }

    public void close() throws Exception {
        for (KafkaSourceReader subReader : this.clusterReaderMap.values()) {
            subReader.close();
        }
        this.kafkaClusterMetricGroupManager.close();
    }

    private KafkaSourceReader<T> createReader(String kafkaClusterId) throws Exception {
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue();
        Properties readerSpecificProperties = new Properties();
        KafkaPropertiesUtil.copyProperties(this.properties, readerSpecificProperties);
        KafkaPropertiesUtil.copyProperties((Properties)Preconditions.checkNotNull((Object)this.clustersProperties.get(kafkaClusterId), (String)"Properties for cluster %s is not found. Current Kafka cluster ids: %s", (Object[])new Object[]{kafkaClusterId, this.clustersProperties.keySet()}), readerSpecificProperties);
        KafkaPropertiesUtil.setClientIdPrefix(readerSpecificProperties, kafkaClusterId);
        final KafkaClusterMetricGroup kafkaClusterMetricGroup = new KafkaClusterMetricGroup(this.dynamicKafkaSourceMetricGroup, this.readerContext.metricGroup(), kafkaClusterId);
        this.kafkaClusterMetricGroupManager.register(kafkaClusterId, kafkaClusterMetricGroup);
        KafkaSourceReaderMetrics kafkaSourceReaderMetrics = new KafkaSourceReaderMetrics(kafkaClusterMetricGroup);
        this.deserializationSchema.open(new DeserializationSchema.InitializationContext(){

            public MetricGroup getMetricGroup() {
                return kafkaClusterMetricGroup.addGroup("deserializer");
            }

            public UserCodeClassLoader getUserCodeClassLoader() {
                return DynamicKafkaSourceReader.this.readerContext.getUserCodeClassLoader();
            }
        });
        KafkaRecordEmitter<T> recordEmitter = new KafkaRecordEmitter<T>(this.deserializationSchema);
        return new KafkaSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>)elementsQueue, new KafkaSourceFetcherManager((FutureCompletingBlockingQueue<RecordsWithSplitIds<ConsumerRecord<byte[], byte[]>>>)elementsQueue, () -> new KafkaPartitionSplitReaderWrapper(readerSpecificProperties, this.readerContext, kafkaSourceReaderMetrics, kafkaClusterId), ignore -> {}), recordEmitter, DynamicKafkaSourceReader.toConfiguration(readerSpecificProperties), this.readerContext, kafkaSourceReaderMetrics);
    }

    private void completeAndResetAvailabilityHelper() {
        CompletableFuture cachedPreviousFuture = this.availabilityHelper.getAvailableFuture();
        this.availabilityHelper = new MultipleFuturesAvailabilityHelper(this.clusterReaderMap.size());
        this.syncAvailabilityHelperWithReaders();
        this.availabilityHelper.getAvailableFuture().whenComplete((ignore, t) -> {
            this.restartingReaders.set(false);
            cachedPreviousFuture.complete(null);
        });
    }

    private void syncAvailabilityHelperWithReaders() {
        int i = 0;
        for (String kafkaClusterId : this.clusterReaderMap.navigableKeySet()) {
            this.availabilityHelper.anyOf(i, ((KafkaSourceReader)((Object)this.clusterReaderMap.get(kafkaClusterId))).isAvailable());
            ++i;
        }
    }

    private void closeAllReadersAndClearState() {
        for (Map.Entry entry : this.clusterReaderMap.entrySet()) {
            try {
                logger.info("Closing sub reader in reader {} for cluster: {}", (Object)this.readerContext.getIndexOfSubtask(), entry.getKey());
                ((KafkaSourceReader)((Object)entry.getValue())).close();
                this.kafkaClusterMetricGroupManager.close((String)entry.getKey());
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        this.clusterReaderMap.clear();
        this.clustersProperties.clear();
    }

    static Configuration toConfiguration(Properties props) {
        Configuration config = new Configuration();
        props.stringPropertyNames().forEach(key -> config.setString(key, props.getProperty((String)key)));
        return config;
    }

    private InputStatus logAndReturnInputStatus(InputStatus inputStatus) {
        if (InputStatus.END_OF_INPUT.equals((Object)inputStatus)) {
            logger.info("inputStatus={}, subtaskIndex={}", (Object)inputStatus, (Object)this.readerContext.getIndexOfSubtask());
        } else {
            logger.trace("inputStatus={}, subtaskIndex={}", (Object)inputStatus, (Object)this.readerContext.getIndexOfSubtask());
        }
        return inputStatus;
    }

    @VisibleForTesting
    public MultipleFuturesAvailabilityHelper getAvailabilityHelper() {
        return this.availabilityHelper;
    }

    @VisibleForTesting
    public boolean isActivelyConsumingSplits() {
        return this.isActivelyConsumingSplits;
    }
}

