/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader.fetcher;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher;
import org.apache.flink.connector.base.source.reader.mocks.MockSourceReader;
import org.apache.flink.connector.base.source.reader.mocks.MockSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.flink.connector.testutils.source.reader.TestingReaderOutput;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class SplitFetcherPauseResumeSplitReaderTest {
    @ParameterizedTest(name="Individual reader per split: {0}")
    @ValueSource(booleans={false, true})
    public void testPauseResumeSplitReaders(boolean individualReader) throws Exception {
        AtomicInteger numSplitReaders = new AtomicInteger();
        MockSplitReader.Builder readerBuilder = SteppingSourceReaderTestHarness.createSplitReaderBuilder();
        SteppingSourceReaderTestHarness testHarness = new SteppingSourceReaderTestHarness(() -> {
            numSplitReaders.getAndIncrement();
            return readerBuilder.build();
        }, new Configuration());
        if (individualReader) {
            testHarness.addPrefilledSplitsIndividualReader(2, 5);
            Assertions.assertThat((int)numSplitReaders.get()).isEqualTo(2);
        } else {
            testHarness.addPrefilledSplitsSingleReader(2, 5);
            Assertions.assertThat((int)numSplitReaders.get()).isEqualTo(1);
        }
        TestingReaderOutput output = new TestingReaderOutput();
        testHarness.runUntilRecordsEmitted(output, 10, 2);
        HashSet recordSet = new HashSet(output.getEmittedRecords());
        Assertions.assertThat(recordSet).containsExactlyInAnyOrder((Object[])new Integer[]{0, 1});
        testHarness.pauseOrResumeSplits(Collections.singleton("0"), Collections.emptyList());
        testHarness.runUntilRecordsEmitted(output, 10, 5);
        HashSet recordSet2 = new HashSet(output.getEmittedRecords());
        Assertions.assertThat(recordSet2).containsExactlyInAnyOrder((Object[])new Integer[]{0, 1, 3, 5, 7});
        testHarness.pauseOrResumeSplits(Collections.emptyList(), Collections.singleton("0"));
        testHarness.runUntilAllRecordsEmitted(output, 10);
        HashSet recordSet3 = new HashSet(output.getEmittedRecords());
        Assertions.assertThat(recordSet3).containsExactlyInAnyOrder((Object[])new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @ParameterizedTest(name="Allow unaligned source splits: {0}")
    @ValueSource(booleans={true, false})
    public void testPauseResumeUnsupported(boolean allowUnalignedSourceSplits) throws Exception {
        AtomicInteger numSplitReaders = new AtomicInteger();
        Configuration configuration = new Configuration();
        configuration.setBoolean("pipeline.watermark-alignment.allow-unaligned-source-splits", allowUnalignedSourceSplits);
        MockSplitReader.Builder readerBuilder = SteppingSourceReaderTestHarness.createSplitReaderBuilder();
        SteppingSourceReaderTestHarness testHarness = new SteppingSourceReaderTestHarness(() -> {
            if (numSplitReaders.getAndIncrement() == 0) {
                return MockSplitReaderUnsupportedPause.cloneBuilder(readerBuilder).build();
            }
            return readerBuilder.build();
        }, configuration);
        testHarness.addPrefilledSplitsIndividualReader(2, 5);
        Assertions.assertThat((int)numSplitReaders.get()).isEqualTo(2);
        TestingReaderOutput output = new TestingReaderOutput();
        testHarness.runUntilRecordsEmitted(output, 10, 2);
        HashSet recordSet = new HashSet(output.getEmittedRecords());
        Assertions.assertThat(recordSet).containsExactlyInAnyOrder((Object[])new Integer[]{0, 1});
        testHarness.pauseOrResumeSplits(Collections.singleton("1"), Collections.emptyList());
        testHarness.runUntilRecordsEmitted(output, 10, 5);
        HashSet recordSet2 = new HashSet(output.getEmittedRecords());
        Assertions.assertThat(recordSet2).containsExactlyInAnyOrder((Object[])new Integer[]{0, 1, 2, 4, 6});
        testHarness.pauseOrResumeSplits(Collections.singleton("0"), Collections.singleton("1"));
        if (allowUnalignedSourceSplits) {
            testHarness.runUntilAllRecordsEmitted(output, 10);
            HashSet recordSet3 = new HashSet(output.getEmittedRecords());
            Assertions.assertThat(recordSet3).containsExactlyInAnyOrder((Object[])new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
        } else {
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> testHarness.runUntilAllRecordsEmitted(output, 10)).isInstanceOf(RuntimeException.class)).hasCauseInstanceOf(UnsupportedOperationException.class);
        }
    }

    private static class SteppingSourceReaderTestHarness {
        private final MockSteppingSplitFetcherManager<int[], MockSourceSplit> fetcherManager;
        private final MockSourceReader sourceReader;

        public SteppingSourceReaderTestHarness(Supplier<SplitReader<int[], MockSourceSplit>> splitReaderSupplier, Configuration configuration) {
            FutureCompletingBlockingQueue queue = new FutureCompletingBlockingQueue(10);
            this.fetcherManager = new MockSteppingSplitFetcherManager(queue, splitReaderSupplier, configuration);
            this.sourceReader = new MockSourceReader((FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>>)queue, this.fetcherManager, configuration, (SourceReaderContext)new TestingReaderContext());
        }

        private static List<MockSourceSplit> createPrefilledSplits(int numSplits, int numRecords) {
            ArrayList<MockSourceSplit> splits = new ArrayList<MockSourceSplit>(numSplits);
            for (int splitId = 0; splitId < numSplits; ++splitId) {
                MockSourceSplit split = new MockSourceSplit(splitId, 0, numRecords);
                for (int i = 0; i < numRecords; ++i) {
                    split.addRecord(i * numSplits + splitId);
                }
                splits.add(split);
            }
            return splits;
        }

        public void addPrefilledSplitsSingleReader(int numSplits, int numRecords) {
            this.sourceReader.addSplits(SteppingSourceReaderTestHarness.createPrefilledSplits(numSplits, numRecords));
            this.sourceReader.notifyNoMoreSplits();
        }

        public void addPrefilledSplitsIndividualReader(int numSplits, int numRecords) {
            for (MockSourceSplit split : SteppingSourceReaderTestHarness.createPrefilledSplits(numSplits, numRecords)) {
                this.sourceReader.addSplits(Collections.singletonList(split));
            }
            this.sourceReader.notifyNoMoreSplits();
        }

        public static MockSplitReader.Builder createSplitReaderBuilder() {
            return MockSplitReader.newBuilder().setNumRecordsPerSplitPerFetch(1).setBlockingFetch(false).setSeparatedFinishedRecord(true);
        }

        public int runUntilRecordsEmitted(TestingReaderOutput readerOutput, int timeoutSeconds, int numRecords) throws Exception {
            AtomicReference exception = new AtomicReference();
            AtomicInteger numFetches = new AtomicInteger();
            CommonTestUtils.waitUtil(() -> {
                try {
                    this.fetcherManager.runEachOnce();
                    numFetches.getAndIncrement();
                    InputStatus status = this.sourceReader.pollNext((ReaderOutput)readerOutput);
                    while (status == InputStatus.MORE_AVAILABLE) {
                        status = this.sourceReader.pollNext((ReaderOutput)readerOutput);
                    }
                    if (status == InputStatus.END_OF_INPUT) {
                        return true;
                    }
                    if (numRecords < 0) {
                        return false;
                    }
                    return readerOutput.getEmittedRecords().size() >= numRecords;
                }
                catch (Exception e) {
                    exception.set(e);
                    return true;
                }
            }, (Duration)Duration.ofSeconds(timeoutSeconds), (String)String.format("%d %s records fetched within timeout", readerOutput.getEmittedRecords().size(), numRecords < 0 ? "but not all" : "out of " + numRecords));
            if (exception.get() != null) {
                throw (Exception)exception.get();
            }
            return numFetches.get();
        }

        public int runUntilAllRecordsEmitted(TestingReaderOutput readerOutput, int timeoutSeconds) throws Exception {
            return this.runUntilRecordsEmitted(readerOutput, timeoutSeconds, -1);
        }

        public void pauseOrResumeSplits(Collection<String> splitsToPause, Collection<String> splitsToResume) {
            this.sourceReader.pauseOrResumeSplits(splitsToPause, splitsToResume);
        }
    }

    private static class MockSplitReaderUnsupportedPause
    extends MockSplitReader {
        public MockSplitReaderUnsupportedPause(int numRecordsPerSplitPerFetch, boolean separatedFinishedRecord, boolean blockingFetch) {
            super(numRecordsPerSplitPerFetch, separatedFinishedRecord, blockingFetch);
        }

        @Override
        public void pauseOrResumeSplits(Collection<MockSourceSplit> splitsToPause, Collection<MockSourceSplit> splitsToResume) {
            throw new UnsupportedOperationException();
        }

        public static Builder cloneBuilder(MockSplitReader.Builder other) {
            return new Builder(other);
        }

        public static class Builder
        extends MockSplitReader.Builder {
            public Builder(MockSplitReader.Builder other) {
                super(other);
            }

            @Override
            public MockSplitReader build() {
                return new MockSplitReaderUnsupportedPause(this.numRecordsPerSplitPerFetch, this.separatedFinishedRecord, this.blockingFetch);
            }
        }
    }

    private static class MockSteppingSplitFetcherManager<E, SplitT extends SourceSplit>
    extends SingleThreadFetcherManager<E, SplitT> {
        public MockSteppingSplitFetcherManager(FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration) {
            super(elementsQueue, splitReaderSupplier, configuration);
        }

        public void addSplits(List<SplitT> splitsToAdd) {
            SplitFetcher fetcher = this.createSplitFetcher();
            fetcher.addSplits(splitsToAdd);
        }

        public void runEachOnce() {
            for (SplitFetcher fetcher : this.fetchers.values()) {
                fetcher.runOnce();
            }
        }
    }
}

