/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SingleThreadMultiplexSourceReaderBase;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
import org.apache.flink.connector.base.source.reader.mocks.PassThroughRecordEmitter;
import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.SourceReaderTestBase;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.AddSplitEvent;
import org.apache.flink.streaming.api.operators.SourceOperator;
import org.apache.flink.streaming.api.operators.source.TestingSourceOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceReaderBaseTest
extends SourceReaderTestBase<MockSourceSplit> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBaseTest.class);

    @Test
    void testExceptionInSplitReader() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> {
            String errMsg = "Testing Exception";
            FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue();
            MockSourceReader reader = new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>)elementsQueue, () -> new SplitReader<int[], MockSourceSplit>(){

                public RecordsWithSplitIds<int[]> fetch() {
                    throw new RuntimeException("Testing Exception");
                }

                public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {
                }

                public void wakeUp() {
                }

                public void close() {
                }
            }, this.getConfig(), (SourceReaderContext)new TestingReaderContext());
            Throwable throwable = null;
            try {
                try {
                    SourceReaderTestBase.ValidatingSourceOutput output = new SourceReaderTestBase.ValidatingSourceOutput((SourceReaderTestBase)this);
                    reader.addSplits(Collections.singletonList(this.getSplit(0, 10, Boundedness.CONTINUOUS_UNBOUNDED)));
                    reader.notifyNoMoreSplits();
                    while (true) {
                        InputStatus inputStatus = reader.pollNext((ReaderOutput)output);
                        Assertions.assertThat((Comparable)inputStatus).isNotEqualTo((Object)InputStatus.END_OF_INPUT);
                        Thread.sleep(1L);
                    }
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
            }
            catch (Throwable throwable3) {
                if (reader != null) {
                    if (throwable != null) {
                        try {
                            reader.close();
                        }
                        catch (Throwable throwable4) {
                            throwable.addSuppressed(throwable4);
                        }
                    } else {
                        reader.close();
                    }
                }
                throw throwable3;
            }
        }).isInstanceOf(RuntimeException.class)).hasMessage("One or more fetchers have encountered exception");
    }

    @Test
    void testRecordsWithSplitsNotRecycledWhenRecordsLeft() throws Exception {
        TestingRecordsWithSplitIds<String> records = new TestingRecordsWithSplitIds<String>("test-split", "value1", "value2");
        SourceReader<String, ?> reader = SourceReaderBaseTest.createReaderAndAwaitAvailable("test-split", records);
        reader.pollNext((ReaderOutput)new TestingReaderOutput());
        Assertions.assertThat((boolean)records.isRecycled()).isFalse();
    }

    @Test
    void testRecordsWithSplitsRecycledWhenEmpty() throws Exception {
        TestingRecordsWithSplitIds<String> records = new TestingRecordsWithSplitIds<String>("test-split", "value1", "value2");
        SourceReader<String, ?> reader = SourceReaderBaseTest.createReaderAndAwaitAvailable("test-split", records);
        reader.pollNext((ReaderOutput)new TestingReaderOutput());
        reader.pollNext((ReaderOutput)new TestingReaderOutput());
        reader.pollNext((ReaderOutput)new TestingReaderOutput());
        Assertions.assertThat((boolean)records.isRecycled()).isTrue();
    }

    @Test
    void testMultipleSplitsWithDifferentFinishingMoments() throws Exception {
        InputStatus status;
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue();
        MockSplitReader mockSplitReader = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(2).setSeparatedFinishedRecord(false).setBlockingFetch(false).build();
        MockSourceReader reader = new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>)elementsQueue, () -> mockSplitReader, this.getConfig(), (SourceReaderContext)new TestingReaderContext());
        reader.start();
        List<MockSourceSplit> splits = Arrays.asList(this.getSplit(0, 10, Boundedness.BOUNDED), this.getSplit(1, 12, Boundedness.BOUNDED));
        reader.addSplits(splits);
        reader.notifyNoMoreSplits();
        while ((status = reader.pollNext((ReaderOutput)new TestingReaderOutput())) != InputStatus.END_OF_INPUT) {
            if (status != InputStatus.NOTHING_AVAILABLE) continue;
            reader.isAvailable().get();
        }
    }

    @Test
    void testMultipleSplitsWithSeparatedFinishedRecord() throws Exception {
        InputStatus status;
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue();
        MockSplitReader mockSplitReader = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(2).setSeparatedFinishedRecord(true).setBlockingFetch(false).build();
        MockSourceReader reader = new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>)elementsQueue, () -> mockSplitReader, this.getConfig(), (SourceReaderContext)new TestingReaderContext());
        reader.start();
        List<MockSourceSplit> splits = Arrays.asList(this.getSplit(0, 10, Boundedness.BOUNDED), this.getSplit(1, 10, Boundedness.BOUNDED));
        reader.addSplits(splits);
        reader.notifyNoMoreSplits();
        while ((status = reader.pollNext((ReaderOutput)new TestingReaderOutput())) != InputStatus.END_OF_INPUT) {
            if (status != InputStatus.NOTHING_AVAILABLE) continue;
            reader.isAvailable().get();
        }
    }

    @Test
    void testPollNextReturnMoreAvailableWhenAllSplitFetcherCloseWithLeftoverElementInQueue() throws Exception {
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue();
        MockSplitReader mockSplitReader = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(1).setBlockingFetch(true).build();
        BlockingShutdownSplitFetcherManager<int[], MockSourceSplit> splitFetcherManager = new BlockingShutdownSplitFetcherManager<int[], MockSourceSplit>(elementsQueue, () -> mockSplitReader, this.getConfig());
        MockSourceReader sourceReader = new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>)elementsQueue, splitFetcherManager, this.getConfig(), (SourceReaderContext)new TestingReaderContext());
        MockSourceSplit split = new MockSourceSplit(0, 0, 1);
        sourceReader.addSplits(Collections.singletonList(split));
        sourceReader.notifyNoMoreSplits();
        splitFetcherManager.getInShutdownSplitFetcherFuture().thenRun(() -> split.addRecord(1));
        Assertions.assertThat((Comparable)sourceReader.pollNext((ReaderOutput)new TestingReaderOutput())).isEqualTo((Object)InputStatus.MORE_AVAILABLE);
    }

    @ParameterizedTest(name="Emit record before split addition: {0}")
    @ValueSource(booleans={true, false})
    void testPerSplitWatermark(boolean emitRecordBeforeSplitAddition) throws Exception {
        MockSplitReader mockSplitReader = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(3).setBlockingFetch(true).build();
        MockSourceReader reader = new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>)new FutureCompletingBlockingQueue(), () -> mockSplitReader, new Configuration(), (SourceReaderContext)new TestingReaderContext());
        SourceOperator sourceOperator = TestingSourceOperator.createTestOperator((SourceReader)reader, (WatermarkStrategy)WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)context -> new OnEventWatermarkGenerator()), (boolean)true);
        MockSourceSplit splitA = new MockSourceSplit(0, 0, 3);
        splitA.addRecord(100);
        splitA.addRecord(200);
        splitA.addRecord(300);
        MockSourceSplit splitB = new MockSourceSplit(1, 0, 3);
        splitB.addRecord(150);
        splitB.addRecord(250);
        splitB.addRecord(350);
        WatermarkCollectingDataOutput output = new WatermarkCollectingDataOutput();
        if (emitRecordBeforeSplitAddition) {
            sourceOperator.emitNext((PushingAsyncDataInput.DataOutput)output);
        }
        AddSplitEvent addSplitsEvent = new AddSplitEvent(Arrays.asList(splitA, splitB), (SimpleVersionedSerializer)new MockSourceSplitSerializer());
        sourceOperator.handleOperatorEvent((OperatorEvent)addSplitsEvent);
        CommonTestUtils.waitUtil(() -> {
            try {
                sourceOperator.emitNext((PushingAsyncDataInput.DataOutput)output);
            }
            catch (Exception e) {
                LOG.warn("Exception caught at emitting records", (Throwable)e);
                return false;
            }
            return output.numRecords == 3;
        }, (Duration)Duration.ofSeconds(10L), (String)String.format("%d out of 3 records are received within timeout", output.numRecords));
        Assertions.assertThat(output.watermarks).isEmpty();
        CommonTestUtils.waitUtil(() -> {
            try {
                sourceOperator.emitNext((PushingAsyncDataInput.DataOutput)output);
            }
            catch (Exception e) {
                LOG.warn("Exception caught at emitting records", (Throwable)e);
                return false;
            }
            return output.numRecords == 6;
        }, (Duration)Duration.ofSeconds(10L), (String)String.format("%d out of 6 records are received within timeout", output.numRecords));
        Assertions.assertThat(output.watermarks).hasSize(3);
        Assertions.assertThat(output.watermarks).containsExactly((Object[])new Long[]{150L, 250L, 300L});
    }

    protected MockSourceReader createReader() {
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue();
        MockSplitReader mockSplitReader = MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(2).setBlockingFetch(true).build();
        return new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>)elementsQueue, () -> mockSplitReader, this.getConfig(), (SourceReaderContext)new TestingReaderContext());
    }

    protected List<MockSourceSplit> getSplits(int numSplits, int numRecordsPerSplit, Boundedness boundedness) {
        ArrayList<MockSourceSplit> mockSplits = new ArrayList<MockSourceSplit>();
        for (int i = 0; i < numSplits; ++i) {
            mockSplits.add(this.getSplit(i, numRecordsPerSplit, boundedness));
        }
        return mockSplits;
    }

    protected MockSourceSplit getSplit(int splitId, int numRecords, Boundedness boundedness) {
        MockSourceSplit mockSplit = boundedness == Boundedness.BOUNDED ? new MockSourceSplit(splitId, 0, numRecords) : new MockSourceSplit(splitId);
        for (int j = 0; j < numRecords; ++j) {
            mockSplit.addRecord(splitId * 10 + j);
        }
        return mockSplit;
    }

    protected long getNextRecordIndex(MockSourceSplit split) {
        return split.index();
    }

    private Configuration getConfig() {
        Configuration config = new Configuration();
        config.setInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, 1);
        config.setLong(SourceReaderOptions.SOURCE_READER_CLOSE_TIMEOUT, 30000L);
        return config;
    }

    private static <E> SourceReader<E, ?> createReaderAndAwaitAvailable(String splitId, RecordsWithSplitIds<E> records) throws Exception {
        FutureCompletingBlockingQueue elementsQueue = new FutureCompletingBlockingQueue();
        SingleThreadMultiplexSourceReaderBase reader = new SingleThreadMultiplexSourceReaderBase<E, E, TestingSourceSplit, TestingSourceSplit>(elementsQueue, () -> new TestingSplitReader(records), new PassThroughRecordEmitter(), new Configuration(), (SourceReaderContext)new TestingReaderContext()){

            public void notifyCheckpointComplete(long checkpointId) {
            }

            protected void onSplitFinished(Map<String, TestingSourceSplit> finishedSplitIds) {
            }

            protected TestingSourceSplit initializedState(TestingSourceSplit split) {
                return split;
            }

            protected TestingSourceSplit toSplitType(String splitId, TestingSourceSplit splitState) {
                return splitState;
            }
        };
        reader.start();
        List<TestingSourceSplit> splits = Collections.singletonList(new TestingSourceSplit(splitId));
        reader.addSplits(splits);
        reader.isAvailable().get();
        return reader;
    }

    private static class WatermarkCollectingDataOutput
    implements PushingAsyncDataInput.DataOutput<Integer> {
        int numRecords = 0;
        final List<Long> watermarks = new ArrayList<Long>();

        private WatermarkCollectingDataOutput() {
        }

        public void emitRecord(StreamRecord<Integer> streamRecord) {
            ++this.numRecords;
        }

        public void emitWatermark(Watermark watermark) throws Exception {
            this.watermarks.add(watermark.getTimestamp());
        }

        public void emitWatermarkStatus(WatermarkStatus watermarkStatus) {
        }

        public void emitLatencyMarker(LatencyMarker latencyMarker) {
        }
    }

    private static class OnEventWatermarkGenerator
    implements WatermarkGenerator<Integer> {
        private OnEventWatermarkGenerator() {
        }

        public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) {
            output.emitWatermark(new org.apache.flink.api.common.eventtime.Watermark((long)event.intValue()));
        }

        public void onPeriodicEmit(WatermarkOutput output) {
        }
    }

    private static class BlockingShutdownSplitFetcherManager<E, SplitT extends SourceSplit>
    extends SingleThreadFetcherManager<E, SplitT> {
        private final CompletableFuture<Void> inShutdownSplitFetcherFuture = new CompletableFuture();

        public BlockingShutdownSplitFetcherManager(FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration) {
            super(elementsQueue, splitReaderSupplier, configuration);
        }

        public boolean maybeShutdownFinishedFetchers() {
            this.shutdownAllSplitFetcher();
            return true;
        }

        public CompletableFuture<Void> getInShutdownSplitFetcherFuture() {
            return this.inShutdownSplitFetcherFuture;
        }

        private void shutdownAllSplitFetcher() {
            this.inShutdownSplitFetcherFuture.complete(null);
            while (!super.maybeShutdownFinishedFetchers()) {
                try {
                    Thread.sleep(1L);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

