/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kinesis.source.reader;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
import org.apache.flink.connector.kinesis.source.split.StartingPosition;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import org.apache.flink.kinesis.shaded.software.amazon.awssdk.services.kinesis.model.Record;

@Internal
public class PollingKinesisShardSplitReader
implements SplitReader<Record, KinesisShardSplit> {
    private static final RecordsWithSplitIds<Record> INCOMPLETE_SHARD_EMPTY_RECORDS = new KinesisRecordsWithSplitIds(Collections.emptyIterator(), null, false);
    private final StreamProxy kinesis;
    private final Deque<KinesisShardSplitState> assignedSplits = new ArrayDeque<KinesisShardSplitState>();

    public PollingKinesisShardSplitReader(StreamProxy kinesisProxy) {
        this.kinesis = kinesisProxy;
    }

    public RecordsWithSplitIds<Record> fetch() throws IOException {
        boolean isComplete;
        KinesisShardSplitState splitState = this.assignedSplits.poll();
        if (splitState == null) {
            return INCOMPLETE_SHARD_EMPTY_RECORDS;
        }
        GetRecordsResponse getRecordsResponse = this.kinesis.getRecords(splitState.getStreamArn(), splitState.getShardId(), splitState.getNextStartingPosition());
        boolean bl = isComplete = getRecordsResponse.nextShardIterator() == null;
        if (this.hasNoRecords(getRecordsResponse)) {
            if (isComplete) {
                return new KinesisRecordsWithSplitIds(Collections.emptyIterator(), splitState.getSplitId(), true);
            }
            this.assignedSplits.add(splitState);
            return INCOMPLETE_SHARD_EMPTY_RECORDS;
        }
        splitState.setNextStartingPosition(StartingPosition.continueFromSequenceNumber(getRecordsResponse.records().get(getRecordsResponse.records().size() - 1).sequenceNumber()));
        this.assignedSplits.add(splitState);
        return new KinesisRecordsWithSplitIds(getRecordsResponse.records().iterator(), splitState.getSplitId(), isComplete);
    }

    private boolean hasNoRecords(GetRecordsResponse getRecordsResponse) {
        return !getRecordsResponse.hasRecords() || getRecordsResponse.records().isEmpty();
    }

    public void handleSplitsChanges(SplitsChange<KinesisShardSplit> splitsChanges) {
        for (KinesisShardSplit split : splitsChanges.splits()) {
            this.assignedSplits.add(new KinesisShardSplitState(split));
        }
    }

    public void wakeUp() {
    }

    public void close() throws Exception {
        this.kinesis.close();
    }

    private static class KinesisRecordsWithSplitIds
    implements RecordsWithSplitIds<Record> {
        private final Iterator<Record> recordsIterator;
        private final String splitId;
        private final boolean isComplete;

        public KinesisRecordsWithSplitIds(Iterator<Record> recordsIterator, String splitId, boolean isComplete) {
            this.recordsIterator = recordsIterator;
            this.splitId = splitId;
            this.isComplete = isComplete;
        }

        @Nullable
        public String nextSplit() {
            return this.recordsIterator.hasNext() ? this.splitId : null;
        }

        @Nullable
        public Record nextRecordFromSplit() {
            return this.recordsIterator.hasNext() ? this.recordsIterator.next() : null;
        }

        public Set<String> finishedSplits() {
            if (this.splitId == null) {
                return Collections.emptySet();
            }
            if (this.recordsIterator.hasNext()) {
                return Collections.emptySet();
            }
            return this.isComplete ? Collections.singleton(this.splitId) : Collections.emptySet();
        }
    }
}

