/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.core.io.InputStatus;
import org.apache.paimon.flink.sink.cdc.TestCdcEvent;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSource;
import org.apache.paimon.flink.source.AbstractNonCoordinatedSourceReader;
import org.apache.paimon.flink.source.SimpleSourceSplit;
import org.apache.paimon.flink.source.SplitListState;

public class TestCdcSource
extends AbstractNonCoordinatedSource<TestCdcEvent> {
    private static final long serialVersionUID = 1L;
    private final LinkedList<TestCdcEvent> events;

    public TestCdcSource(Collection<TestCdcEvent> events) {
        this.events = new LinkedList<TestCdcEvent>(events);
    }

    public Boundedness getBoundedness() {
        return Boundedness.CONTINUOUS_UNBOUNDED;
    }

    public SourceReader<TestCdcEvent, SimpleSourceSplit> createReader(SourceReaderContext context) {
        return new Reader(context.getIndexOfSubtask(), context.currentParallelism(), new LinkedList<TestCdcEvent>(this.events));
    }

    private static class Reader
    extends AbstractNonCoordinatedSourceReader<TestCdcEvent> {
        private final int subtaskId;
        private final int totalSubtasks;
        private final LinkedList<TestCdcEvent> events;
        private final SplitListState<Integer> remainingEventsCount = new SplitListState("events", x -> Integer.toString(x), Integer::parseInt);
        private final int numRecordsPerCheckpoint;
        private final AtomicInteger recordsThisCheckpoint;

        private Reader(int subtaskId, int totalSubtasks, LinkedList<TestCdcEvent> events) {
            this.subtaskId = subtaskId;
            this.totalSubtasks = totalSubtasks;
            this.events = events;
            this.numRecordsPerCheckpoint = events.size() / ThreadLocalRandom.current().nextInt(10, 20) + 1;
            this.recordsThisCheckpoint = new AtomicInteger(0);
        }

        public InputStatus pollNext(ReaderOutput<TestCdcEvent> readerOutput) throws Exception {
            if (this.events.isEmpty()) {
                return InputStatus.END_OF_INPUT;
            }
            if (this.recordsThisCheckpoint.get() >= this.numRecordsPerCheckpoint) {
                Thread.sleep(10L);
                return InputStatus.MORE_AVAILABLE;
            }
            TestCdcEvent event = this.events.poll();
            if (event.records() != null && Math.abs(event.hashCode()) % this.totalSubtasks != this.subtaskId) {
                return InputStatus.MORE_AVAILABLE;
            }
            readerOutput.collect((Object)event);
            this.recordsThisCheckpoint.incrementAndGet();
            return InputStatus.MORE_AVAILABLE;
        }

        public List<SimpleSourceSplit> snapshotState(long l) {
            this.recordsThisCheckpoint.set(0);
            this.remainingEventsCount.clear();
            this.remainingEventsCount.add((Object)this.events.size());
            return this.remainingEventsCount.snapshotState();
        }

        public void addSplits(List<SimpleSourceSplit> list) {
            this.remainingEventsCount.restoreState(list);
            int count = 0;
            Iterator iterator = this.remainingEventsCount.get().iterator();
            while (iterator.hasNext()) {
                int c = (Integer)iterator.next();
                count += c;
            }
            while (this.events.size() > count) {
                this.events.poll();
            }
        }
    }
}

