/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader.source;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.reader.fetcher.PulsarUnorderedFetcherManager;
import org.apache.flink.connector.pulsar.source.reader.message.PulsarMessage;
import org.apache.flink.connector.pulsar.source.reader.source.PulsarSourceReaderBase;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarUnorderedPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.transaction.TransactionCoordinatorClient;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarUnorderedSourceReader<OUT>
extends PulsarSourceReaderBase<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarUnorderedSourceReader.class);
    @Nullable
    private final TransactionCoordinatorClient coordinatorClient;
    @VisibleForTesting
    final SortedMap<Long, List<TxnID>> transactionsToCommit;
    private final List<TxnID> transactionsOfFinishedSplits;

    public PulsarUnorderedSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<PulsarMessage<OUT>>> elementsQueue, Supplier<PulsarUnorderedPartitionSplitReader<OUT>> splitReaderSupplier, SourceReaderContext context, SourceConfiguration sourceConfiguration, PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, @Nullable TransactionCoordinatorClient coordinatorClient) {
        super(elementsQueue, new PulsarUnorderedFetcherManager<OUT>(elementsQueue, splitReaderSupplier::get), context, sourceConfiguration, pulsarClient, pulsarAdmin);
        this.coordinatorClient = coordinatorClient;
        this.transactionsToCommit = Collections.synchronizedSortedMap(new TreeMap());
        this.transactionsOfFinishedSplits = Collections.synchronizedList(new ArrayList());
    }

    protected void onSplitFinished(Map<String, PulsarPartitionSplitState> finishedSplitIds) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("onSplitFinished event: {}", finishedSplitIds);
        }
        if (this.coordinatorClient != null) {
            for (Map.Entry<String, PulsarPartitionSplitState> entry : finishedSplitIds.entrySet()) {
                PulsarPartitionSplitState state = entry.getValue();
                TxnID uncommittedTransactionId = state.getUncommittedTransactionId();
                if (uncommittedTransactionId == null) continue;
                this.transactionsOfFinishedSplits.add(uncommittedTransactionId);
            }
        }
    }

    public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
        LOG.debug("Trigger the new transaction for downstream readers.");
        List<PulsarPartitionSplit> splits = ((PulsarUnorderedFetcherManager)this.splitFetcherManager).snapshotState(checkpointId);
        if (this.coordinatorClient != null) {
            List txnIDs = this.transactionsToCommit.computeIfAbsent(checkpointId, id -> new ArrayList());
            for (PulsarPartitionSplit split : splits) {
                TxnID uncommittedTransactionId = split.getUncommittedTransactionId();
                if (uncommittedTransactionId == null) continue;
                txnIDs.add(uncommittedTransactionId);
            }
        }
        return splits;
    }

    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        LOG.debug("Committing transactions for checkpoint {}", (Object)checkpointId);
        if (this.coordinatorClient != null) {
            for (Map.Entry<Long, List<TxnID>> entry : this.transactionsToCommit.entrySet()) {
                Long currentCheckpointId = entry.getKey();
                if (currentCheckpointId > checkpointId) continue;
                List<TxnID> transactions = entry.getValue();
                for (TxnID transaction : transactions) {
                    this.coordinatorClient.commit(transaction);
                    this.transactionsOfFinishedSplits.remove(transaction);
                }
                this.transactionsToCommit.remove(currentCheckpointId);
            }
        }
    }
}

