/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.pulsar.common.config.PulsarClientFactory;
import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto;
import org.apache.flink.connector.pulsar.common.schema.BytesSchema;
import org.apache.flink.connector.pulsar.common.schema.PulsarSchema;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.PulsarPartitionSplitReader;
import org.apache.flink.connector.pulsar.source.reader.PulsarRecordEmitter;
import org.apache.flink.connector.pulsar.source.reader.PulsarSourceFetcherManager;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchemaInitializationContext;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarSchemaWrapper;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarSourceReader<OUT>
extends SourceReaderBase<Message<byte[]>, OUT, PulsarPartitionSplit, PulsarPartitionSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarSourceReader.class);
    private final SourceConfiguration sourceConfiguration;
    private final PulsarClient pulsarClient;
    @VisibleForTesting
    final SortedMap<Long, Map<TopicPartition, MessageId>> cursorsToCommit;
    private final ConcurrentMap<TopicPartition, MessageId> cursorsOfFinishedSplits;
    private final AtomicReference<Throwable> cursorCommitThrowable;
    private ScheduledExecutorService cursorScheduler;

    private PulsarSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>> elementsQueue, PulsarSourceFetcherManager fetcherManager, PulsarDeserializationSchema<OUT> deserializationSchema, SourceConfiguration sourceConfiguration, PulsarClient pulsarClient, SourceReaderContext context) {
        super(elementsQueue, (SplitFetcherManager)fetcherManager, new PulsarRecordEmitter<OUT>(deserializationSchema), (Configuration)sourceConfiguration, context);
        this.sourceConfiguration = sourceConfiguration;
        this.pulsarClient = pulsarClient;
        this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap());
        this.cursorsOfFinishedSplits = new ConcurrentHashMap<TopicPartition, MessageId>();
        this.cursorCommitThrowable = new AtomicReference();
    }

    public void start() {
        super.start();
        if (this.sourceConfiguration.isEnableAutoAcknowledgeMessage()) {
            this.cursorScheduler = Executors.newSingleThreadScheduledExecutor();
            this.cursorScheduler.scheduleAtFixedRate(this::cumulativeAcknowledgmentMessage, this.sourceConfiguration.getMaxFetchTime().toMillis(), this.sourceConfiguration.getAutoCommitCursorInterval(), TimeUnit.MILLISECONDS);
        }
    }

    public InputStatus pollNext(ReaderOutput<OUT> output) throws Exception {
        Throwable cause = this.cursorCommitThrowable.get();
        if (cause != null) {
            throw new FlinkRuntimeException("An error occurred in acknowledge message.", cause);
        }
        return super.pollNext(output);
    }

    protected void onSplitFinished(Map<String, PulsarPartitionSplitState> finishedSplitIds) {
        for (String string : finishedSplitIds.keySet()) {
            ((PulsarSourceFetcherManager)this.splitFetcherManager).closeFetcher(string);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("onSplitFinished event: {}", finishedSplitIds);
        }
        for (Map.Entry entry : finishedSplitIds.entrySet()) {
            PulsarPartitionSplitState state = (PulsarPartitionSplitState)entry.getValue();
            MessageId latestConsumedId = state.getLatestConsumedId();
            if (latestConsumedId == null) continue;
            this.cursorsOfFinishedSplits.put(state.getPartition(), latestConsumedId);
        }
    }

    protected PulsarPartitionSplitState initializedState(PulsarPartitionSplit split) {
        return new PulsarPartitionSplitState(split);
    }

    protected PulsarPartitionSplit toSplitType(String splitId, PulsarPartitionSplitState splitState) {
        return splitState.toPulsarPartitionSplit();
    }

    public void pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume) {
        this.splitFetcherManager.pauseOrResumeSplits(splitsToPause, splitsToResume);
    }

    public List<PulsarPartitionSplit> snapshotState(long checkpointId) {
        List splits = super.snapshotState(checkpointId);
        Map cursors = this.cursorsToCommit.computeIfAbsent(checkpointId, id -> new HashMap());
        for (PulsarPartitionSplit split : splits) {
            MessageId latestConsumedId = split.getLatestConsumedId();
            if (latestConsumedId == null) continue;
            cursors.put(split.getPartition(), latestConsumedId);
        }
        cursors.putAll(this.cursorsOfFinishedSplits);
        return splits;
    }

    public void notifyCheckpointComplete(long checkpointId) {
        LOG.debug("Committing cursors for checkpoint {}", (Object)checkpointId);
        Map cursors = (Map)this.cursorsToCommit.get(checkpointId);
        try {
            ((PulsarSourceFetcherManager)this.splitFetcherManager).acknowledgeMessages(cursors);
            LOG.debug("Successfully acknowledge cursors for checkpoint {}", (Object)checkpointId);
            this.cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
            this.cursorsToCommit.headMap(checkpointId + 1L).clear();
        }
        catch (Exception e) {
            LOG.error("Failed to acknowledge cursors for checkpoint {}", (Object)checkpointId, (Object)e);
            this.cursorCommitThrowable.compareAndSet(null, e);
        }
    }

    public void close() throws Exception {
        if (this.cursorScheduler != null) {
            this.cursorScheduler.shutdown();
        }
        super.close();
        this.pulsarClient.shutdown();
    }

    private void cumulativeAcknowledgmentMessage() {
        HashMap<TopicPartition, MessageId> cursors = new HashMap<TopicPartition, MessageId>(this.cursorsOfFinishedSplits);
        List splits = super.snapshotState(1L);
        for (PulsarPartitionSplit split : splits) {
            MessageId latestConsumedId = split.getLatestConsumedId();
            if (latestConsumedId == null) continue;
            cursors.put(split.getPartition(), latestConsumedId);
        }
        try {
            ((PulsarSourceFetcherManager)this.splitFetcherManager).acknowledgeMessages(cursors);
            this.cursorsOfFinishedSplits.keySet().removeAll(cursors.keySet());
        }
        catch (Exception e) {
            LOG.error("Fail in auto cursor commit.", (Throwable)e);
            this.cursorCommitThrowable.compareAndSet(null, e);
        }
    }

    public static <OUT> PulsarSourceReader<OUT> create(SourceConfiguration sourceConfiguration, PulsarDeserializationSchema<OUT> deserializationSchema, PulsarCrypto pulsarCrypto, SourceReaderContext readerContext) throws Exception {
        Schema schema;
        int queueCapacity = sourceConfiguration.getMessageQueueCapacity();
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue(queueCapacity);
        PulsarClient pulsarClient = PulsarClientFactory.createClient(sourceConfiguration);
        PulsarDeserializationSchemaInitializationContext initializationContext = new PulsarDeserializationSchemaInitializationContext(readerContext, pulsarClient);
        deserializationSchema.open(initializationContext, sourceConfiguration);
        if (sourceConfiguration.isEnableSchemaEvolution()) {
            PulsarSchema<?> pulsarSchema = ((PulsarSchemaWrapper)deserializationSchema).pulsarSchema();
            schema = new BytesSchema(pulsarSchema);
        } else {
            schema = Schema.BYTES;
        }
        Supplier<SplitReader<Message<byte[]>, PulsarPartitionSplit>> splitReaderSupplier = () -> new PulsarPartitionSplitReader(pulsarClient, sourceConfiguration, (Schema<byte[]>)schema, pulsarCrypto, readerContext.metricGroup());
        PulsarSourceFetcherManager fetcherManager = new PulsarSourceFetcherManager((FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>>)elementsQueue, splitReaderSupplier, readerContext.getConfiguration());
        return new PulsarSourceReader<OUT>((FutureCompletingBlockingQueue<RecordsWithSplitIds<Message<byte[]>>>)elementsQueue, fetcherManager, deserializationSchema, sourceConfiguration, pulsarClient, readerContext);
    }
}

