/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.testutils.source.reader;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public abstract class SourceReaderTestBase<SplitT extends SourceSplit>
extends TestLogger {
    protected static final int NUM_SPLITS = 10;
    protected static final int NUM_RECORDS_PER_SPLIT = 10;
    protected static final int TOTAL_NUM_RECORDS = 100;
    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    @After
    public void ensureNoDangling() {
        for (Thread t : Thread.getAllStackTraces().keySet()) {
            if (!t.getName().equals("SourceFetcher")) continue;
            System.out.println("Dangling thread.");
        }
    }

    @Test
    public void testRead() throws Exception {
        try (SourceReader<Integer, SplitT> reader = this.createReader();){
            reader.addSplits(this.getSplits(10, 10, Boundedness.BOUNDED));
            ValidatingSourceOutput output = new ValidatingSourceOutput();
            while (output.count < 100) {
                reader.pollNext((ReaderOutput)output);
            }
            output.validate();
        }
    }

    @Test
    public void testAddSplitToExistingFetcher() throws Exception {
        Thread.sleep(10L);
        ValidatingSourceOutput output = new ValidatingSourceOutput();
        List<SplitT> splits = Collections.singletonList(this.getSplit(0, 10, Boundedness.BOUNDED));
        try (SourceReader<Integer, SplitT> reader = this.consumeRecords(splits, output, 5);){
            ArrayList<SplitT> newSplits = new ArrayList<SplitT>();
            for (int i = 1; i < 10; ++i) {
                newSplits.add(this.getSplit(i, 10, Boundedness.BOUNDED));
            }
            reader.addSplits(newSplits);
            while (output.count() < 100) {
                reader.pollNext((ReaderOutput)output);
            }
            output.validate();
        }
    }

    @Test(timeout=30000L)
    public void testPollingFromEmptyQueue() throws Exception {
        ValidatingSourceOutput output = new ValidatingSourceOutput();
        List<SplitT> splits = Collections.singletonList(this.getSplit(0, 10, Boundedness.BOUNDED));
        try (SourceReader<Integer, SplitT> reader = this.consumeRecords(splits, output, 10);){
            Assert.assertEquals((String)"The status should be ", (Object)InputStatus.NOTHING_AVAILABLE, (Object)reader.pollNext((ReaderOutput)output));
        }
    }

    @Test(timeout=30000L)
    public void testAvailableOnEmptyQueue() throws Exception {
        try (SourceReader<Integer, SplitT> reader = this.createReader();){
            CompletableFuture future = reader.isAvailable();
            Assert.assertFalse((String)"There should be no records ready for poll.", (boolean)future.isDone());
            reader.addSplits(Collections.singletonList(this.getSplit(0, 10, Boundedness.BOUNDED)));
            future.get();
        }
    }

    @Test(timeout=30000L)
    public void testSnapshot() throws Exception {
        ValidatingSourceOutput output = new ValidatingSourceOutput();
        List<SplitT> splits = this.getSplits(10, 10, Boundedness.CONTINUOUS_UNBOUNDED);
        try (SourceReader<Integer, SplitT> reader = this.consumeRecords(splits, output, 100);){
            List state = reader.snapshotState(1L);
            Assert.assertEquals((String)"The snapshot should only have 10 splits. ", (long)10L, (long)state.size());
            for (int i = 0; i < 10; ++i) {
                Assert.assertEquals((String)"The first four splits should have been fully consumed.", (long)10L, (long)this.getNextRecordIndex((SourceSplit)state.get(i)));
            }
        }
    }

    protected abstract SourceReader<Integer, SplitT> createReader();

    protected abstract List<SplitT> getSplits(int var1, int var2, Boundedness var3);

    protected abstract SplitT getSplit(int var1, int var2, Boundedness var3);

    protected abstract long getNextRecordIndex(SplitT var1);

    private SourceReader<Integer, SplitT> consumeRecords(List<SplitT> splits, ValidatingSourceOutput output, int n) throws Exception {
        SourceReader<Integer, SplitT> reader = this.createReader();
        reader.addSplits(splits);
        while (output.count() < n) {
            reader.pollNext((ReaderOutput)output);
        }
        return reader;
    }

    public static class ValidatingSourceOutput
    implements ReaderOutput<Integer> {
        private Set<Integer> consumedValues = new HashSet<Integer>();
        private int max = Integer.MIN_VALUE;
        private int min = Integer.MAX_VALUE;
        private int count = 0;

        public void collect(Integer element) {
            this.max = Math.max(element, this.max);
            this.min = Math.min(element, this.min);
            ++this.count;
            this.consumedValues.add(element);
        }

        public void collect(Integer element, long timestamp) {
            this.collect(element);
        }

        public void validate() {
            Assert.assertEquals((String)String.format("Should be %d distinct elements in total", 100), (long)100L, (long)this.consumedValues.size());
            Assert.assertEquals((String)String.format("Should be %d elements in total", 100), (long)100L, (long)this.count);
            Assert.assertEquals((String)"The min value should be 0", (long)0L, (long)this.min);
            Assert.assertEquals((String)"The max value should be 99", (long)99L, (long)this.max);
        }

        public int count() {
            return this.count;
        }

        public void emitWatermark(Watermark watermark) {
        }

        public void markIdle() {
        }

        public SourceOutput<Integer> createOutputForSplit(String splitId) {
            return this;
        }

        public void releaseOutputForSplit(String splitId) {
        }
    }
}

