package com.ververica.cdc.connectors.base.source.reader;

import com.ververica.cdc.common.annotation.Experimental;
import com.ververica.cdc.connectors.base.config.SourceConfig;
import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
import com.ververica.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsAckEvent;
import com.ververica.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsReportEvent;
import com.ververica.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsRequestEvent;
import com.ververica.cdc.connectors.base.source.meta.events.StreamSplitMetaEvent;
import com.ververica.cdc.connectors.base.source.meta.events.StreamSplitMetaRequestEvent;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplit;
import com.ververica.cdc.connectors.base.source.meta.split.SnapshotSplitState;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitSerializer;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitState;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplitState;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Experimental
/* loaded from: input_file:com/ververica/cdc/connectors/base/source/reader/IncrementalSourceReader.class */
public class IncrementalSourceReader<T, C extends SourceConfig> extends SingleThreadMultiplexSourceReaderBase<SourceRecords, T, SourceSplitBase, SourceSplitState> {
    private static final Logger LOG = LoggerFactory.getLogger(IncrementalSourceReader.class);
    private final Map<String, SnapshotSplit> finishedUnackedSplits;
    private final Map<String, StreamSplit> uncompletedStreamSplits;
    private final int subtaskId;
    private final SourceSplitSerializer sourceSplitSerializer;
    private final C sourceConfig;
    private final DataSourceDialect<C> dialect;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public IncrementalSourceReader(FutureCompletingBlockingQueue<RecordsWithSplitIds<SourceRecords>> futureCompletingBlockingQueue, Supplier<IncrementalSourceSplitReader<C>> supplier, RecordEmitter<SourceRecords, T, SourceSplitState> recordEmitter, Configuration configuration, SourceReaderContext sourceReaderContext, C c, SourceSplitSerializer sourceSplitSerializer, DataSourceDialect<C> dataSourceDialect) {
        super(futureCompletingBlockingQueue, new SingleThreadFetcherManager(futureCompletingBlockingQueue, supplier::get), recordEmitter, configuration, sourceReaderContext);
        supplier.getClass();
        this.sourceConfig = c;
        this.finishedUnackedSplits = new HashMap();
        this.uncompletedStreamSplits = new HashMap();
        this.subtaskId = sourceReaderContext.getIndexOfSubtask();
        this.sourceSplitSerializer = (SourceSplitSerializer) Preconditions.checkNotNull(sourceSplitSerializer);
        this.dialect = dataSourceDialect;
    }

    public void start() {
        if (getNumberOfCurrentlyAssignedSplits() == 0) {
            this.context.sendSplitRequest();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceSplitState initializedState(SourceSplitBase sourceSplitBase) {
        return sourceSplitBase.isSnapshotSplit() ? new SnapshotSplitState(sourceSplitBase.asSnapshotSplit()) : new StreamSplitState(sourceSplitBase.asStreamSplit());
    }

    public List<SourceSplitBase> snapshotState(long j) {
        List<SourceSplitBase> snapshotState = super.snapshotState(j);
        snapshotState.addAll(this.finishedUnackedSplits.values());
        snapshotState.addAll(this.uncompletedStreamSplits.values());
        return snapshotState;
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.dialect.notifyCheckpointComplete(j);
    }

    protected void onSplitFinished(Map<String, SourceSplitState> map) {
        Iterator<SourceSplitState> it = map.values().iterator();
        while (it.hasNext()) {
            SourceSplitBase sourceSplit = it.next().toSourceSplit();
            Preconditions.checkState(sourceSplit.isSnapshotSplit(), String.format("Only snapshot split could finish, but the actual split is stream split %s", sourceSplit));
            this.finishedUnackedSplits.put(sourceSplit.splitId(), sourceSplit.asSnapshotSplit());
        }
        reportFinishedSnapshotSplitsIfNeed();
        this.context.sendSplitRequest();
    }

    public void addSplits(List<SourceSplitBase> list) {
        ArrayList arrayList = new ArrayList();
        for (SourceSplitBase sourceSplitBase : list) {
            if (sourceSplitBase.isSnapshotSplit()) {
                SnapshotSplit asSnapshotSplit = sourceSplitBase.asSnapshotSplit();
                if (asSnapshotSplit.isSnapshotReadFinished()) {
                    this.finishedUnackedSplits.put(asSnapshotSplit.splitId(), asSnapshotSplit);
                } else {
                    arrayList.add(sourceSplitBase);
                }
            } else if (sourceSplitBase.asStreamSplit().isCompletedSplit()) {
                this.uncompletedStreamSplits.remove(sourceSplitBase.splitId());
                arrayList.add(discoverTableSchemasForStreamSplit(sourceSplitBase.asStreamSplit()));
            } else {
                this.uncompletedStreamSplits.put(sourceSplitBase.splitId(), sourceSplitBase.asStreamSplit());
                requestStreamSplitMetaIfNeeded(sourceSplitBase.asStreamSplit());
            }
        }
        reportFinishedSnapshotSplitsIfNeed();
        super.addSplits(arrayList);
    }

    private StreamSplit discoverTableSchemasForStreamSplit(StreamSplit streamSplit) {
        String splitId = streamSplit.splitId();
        if (!streamSplit.getTableSchemas().isEmpty()) {
            LOG.warn("The stream split {} has table schemas yet, skip the table schema discovery", streamSplit);
            return streamSplit;
        }
        try {
            Map<TableId, TableChanges.TableChange> discoverDataCollectionSchemas = this.dialect.discoverDataCollectionSchemas(this.sourceConfig);
            LOG.info("The table schema discovery for stream split {} success", splitId);
            return StreamSplit.fillTableSchemas(streamSplit, discoverDataCollectionSchemas);
        } catch (Exception e) {
            LOG.error("Failed to obtains table schemas due to {}", e.getMessage());
            throw new FlinkRuntimeException(e);
        }
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof FinishedSnapshotSplitsAckEvent) {
            FinishedSnapshotSplitsAckEvent finishedSnapshotSplitsAckEvent = (FinishedSnapshotSplitsAckEvent) sourceEvent;
            LOG.debug("The subtask {} receives ack event for {} from enumerator.", Integer.valueOf(this.subtaskId), finishedSnapshotSplitsAckEvent.getFinishedSplits());
            Iterator<String> it = finishedSnapshotSplitsAckEvent.getFinishedSplits().iterator();
            while (it.hasNext()) {
                this.finishedUnackedSplits.remove(it.next());
            }
            return;
        }
        if (sourceEvent instanceof FinishedSnapshotSplitsRequestEvent) {
            LOG.debug("The subtask {} receives request to report finished snapshot splits.", Integer.valueOf(this.subtaskId));
            reportFinishedSnapshotSplitsIfNeed();
        } else if (!(sourceEvent instanceof StreamSplitMetaEvent)) {
            super.handleSourceEvents(sourceEvent);
        } else {
            LOG.debug("The subtask {} receives stream meta with group id {}.", Integer.valueOf(this.subtaskId), Integer.valueOf(((StreamSplitMetaEvent) sourceEvent).getMetaGroupId()));
            fillMetaDataForStreamSplit((StreamSplitMetaEvent) sourceEvent);
        }
    }

    private void fillMetaDataForStreamSplit(StreamSplitMetaEvent streamSplitMetaEvent) {
        StreamSplit streamSplit = this.uncompletedStreamSplits.get(streamSplitMetaEvent.getSplitId());
        if (streamSplit == null) {
            LOG.warn("Received metadata event for split {}, but the uncompleted split map does not contain it", streamSplitMetaEvent.getSplitId());
            return;
        }
        int metaGroupId = streamSplitMetaEvent.getMetaGroupId();
        int nextMetaGroupId = getNextMetaGroupId(streamSplit.getFinishedSnapshotSplitInfos().size(), this.sourceConfig.getSplitMetaGroupSize());
        if (metaGroupId == nextMetaGroupId) {
            Stream<byte[]> stream = streamSplitMetaEvent.getMetaGroup().stream();
            SourceSplitSerializer sourceSplitSerializer = this.sourceSplitSerializer;
            sourceSplitSerializer.getClass();
            List list = (List) stream.map(sourceSplitSerializer::deserialize).collect(Collectors.toList());
            this.uncompletedStreamSplits.put(streamSplit.splitId(), StreamSplit.appendFinishedSplitInfos(streamSplit, list));
            LOG.info("Fill metadata of group {} to stream split", Integer.valueOf(list.size()));
        } else {
            LOG.warn("Received out of oder metadata event for split {}, the received meta group id is {}, but expected is {}, ignore it", new Object[]{streamSplitMetaEvent.getSplitId(), Integer.valueOf(metaGroupId), Integer.valueOf(nextMetaGroupId)});
        }
        requestStreamSplitMetaIfNeeded(streamSplit);
    }

    private void requestStreamSplitMetaIfNeeded(StreamSplit streamSplit) {
        String splitId = streamSplit.splitId();
        if (streamSplit.isCompletedSplit()) {
            LOG.info("The meta of stream split {} has been collected success", splitId);
            addSplits(Collections.singletonList(streamSplit));
        } else {
            this.context.sendSourceEventToCoordinator(new StreamSplitMetaRequestEvent(splitId, getNextMetaGroupId(streamSplit.getFinishedSnapshotSplitInfos().size(), this.sourceConfig.getSplitMetaGroupSize())));
        }
    }

    private void reportFinishedSnapshotSplitsIfNeed() {
        if (this.finishedUnackedSplits.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap();
        for (SnapshotSplit snapshotSplit : this.finishedUnackedSplits.values()) {
            hashMap.put(snapshotSplit.splitId(), snapshotSplit.getHighWatermark());
        }
        this.context.sendSourceEventToCoordinator(new FinishedSnapshotSplitsReportEvent(hashMap));
        LOG.debug("The subtask {} reports offsets of finished snapshot splits {}.", Integer.valueOf(this.subtaskId), hashMap);
    }

    public static int getNextMetaGroupId(int i, int i2) {
        Preconditions.checkState(i2 > 0);
        return i % i2 == 0 ? i / i2 : (i / i2) + 1;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SourceSplitBase toSplitType(String str, SourceSplitState sourceSplitState) {
        return sourceSplitState.toSourceSplit();
    }
}
