/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader.fetcher;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordsBySplits;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.mocks.TestingRecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.mocks.TestingSourceSplit;
import org.apache.flink.connector.base.source.reader.mocks.TestingSplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.test.util.TestUtils;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

class SplitFetcherManagerTest {
    SplitFetcherManagerTest() {
    }

    @Test
    void testExceptionPropagationFirstFetch() throws Exception {
        this.testExceptionPropagation(new RecordsWithSplitIds[0]);
    }

    @Test
    void testExceptionPropagationSuccessiveFetch() throws Exception {
        this.testExceptionPropagation(new RecordsWithSplitIds[]{new TestingRecordsWithSplitIds<Integer>("testSplit", 1, 2, 3, 4), new TestingRecordsWithSplitIds<Integer>("testSplit", 5, 6, 7, 8)});
    }

    @Test
    void testCloseFetcherWithException() throws Exception {
        TestingSplitReader reader = new TestingSplitReader(new RecordsWithSplitIds[0]);
        reader.setCloseWithException();
        SplitFetcherManager fetcherManager = SplitFetcherManagerTest.createFetcher("test-split", reader, new Configuration());
        fetcherManager.close(30000L);
        Assertions.assertThatThrownBy(() -> fetcherManager.checkErrors()).hasRootCauseMessage("Artificial exception on closing the split reader.");
    }

    @Test
    @Timeout(value=30000L, unit=TimeUnit.MILLISECONDS)
    void testCloseCleansUpPreviouslyClosedFetcher() throws Exception {
        String splitId = "testSplit";
        Configuration config = new Configuration();
        config.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, (Object)1);
        AwaitingReader reader = new AwaitingReader(new IOException("Should not happen"), new RecordsWithSplitIds[]{new RecordsBySplits(Collections.emptyMap(), Collections.singleton("testSplit"))});
        SplitFetcherManager fetcherManager = SplitFetcherManagerTest.createFetcher("testSplit", reader, config);
        fetcherManager.getQueue().getAvailabilityFuture().get();
        TestUtils.waitUntil(() -> {
            fetcherManager.maybeShutdownFinishedFetchers();
            return fetcherManager.fetchers.isEmpty();
        }, (String)"The idle fetcher should have been removed.");
        fetcherManager.close(Long.MAX_VALUE);
    }

    @Test
    public void testCloseBlockingWaitingForFetcherShutdown() throws Exception {
        String splitId = "testSplit";
        AwaitingReader reader = new AwaitingReader();
        SplitFetcherManager fetcherManager = SplitFetcherManagerTest.createFetcher("testSplit", reader, new Configuration());
        Thread closingThread = new Thread(() -> {
            try {
                fetcherManager.close(Long.MAX_VALUE);
            }
            catch (Exception e) {
                Assertions.fail((String)"failed.");
                throw new RuntimeException(e);
            }
        }, "closingThread");
        closingThread.start();
        TestUtils.waitUntil(() -> SplitFetcherManagerTest.findThread("Source Data Fetcher for ").size() == 2, (Duration)Duration.ofSeconds(30L), (String)"The element queue draining thread should have started.");
        for (Thread t : SplitFetcherManagerTest.findThread("Source Data Fetcher for ")) {
            TestUtils.waitUntil(() -> t.getState().equals((Object)Thread.State.WAITING) || t.getState().equals((Object)Thread.State.TIMED_WAITING), (Duration)Duration.ofSeconds(30L), (String)"All the executor threads should be in waiting status.");
        }
        ((AbstractIntegerAssert)Assertions.assertThat((int)fetcherManager.getQueue().getAvailabilityFuture().getNumberOfDependents()).as("The future should have just one dependent stage", new Object[0])).isLessThanOrEqualTo(1);
        Assertions.assertThat((int)fetcherManager.fetchers.size()).isEqualTo(1);
        reader.triggerThrowException();
        reader.triggerClose();
        TestUtils.waitUntil(fetcherManager.fetchers::isEmpty, (String)"The fetcher should be closed now.");
        closingThread.join();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testIdleShutdownSplitFetcherWaitsUntilRecordProcessed() throws Exception {
        String splitId = "testSplit";
        AwaitingReader reader = new AwaitingReader(new IOException("Should not happen"), new RecordsWithSplitIds[]{new RecordsBySplits(Collections.emptyMap(), Collections.singleton("testSplit"))});
        SplitFetcherManager fetcherManager = SplitFetcherManagerTest.createFetcher("testSplit", reader, new Configuration());
        try {
            FutureCompletingBlockingQueue queue = fetcherManager.getQueue();
            queue.getAvailabilityFuture().get();
            TestUtils.waitUntil(() -> {
                fetcherManager.maybeShutdownFinishedFetchers();
                return fetcherManager.getNumAliveFetchers() == 0;
            }, (Duration)Duration.ofSeconds(1L), (String)"The fetcher should have already been removed from the alive fetchers.");
            TestUtils.waitUntil(() -> queue.size() == 2, (Duration)Duration.ofSeconds(1L), (String)"The element queue should have 2 batches when the fetcher is closed.");
            ((RecordsWithSplitIds)queue.poll()).recycle();
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)reader.isClosed).as("The reader should have not been closed.", new Object[0])).isFalse();
            ((RecordsWithSplitIds)queue.poll()).recycle();
            TestUtils.waitUntil(() -> reader.isClosed, (Duration)Duration.ofSeconds(1L), (String)"The reader should hava been closed.");
        }
        finally {
            fetcherManager.close(30000L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @SafeVarargs
    private final void testExceptionPropagation(RecordsWithSplitIds<Integer> ... fetchesBeforeError) throws Exception {
        IOException testingException = new IOException("test");
        AwaitingReader reader = new AwaitingReader(testingException, fetchesBeforeError);
        Configuration configuration = new Configuration();
        configuration.set(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY, (Object)10);
        SplitFetcherManager<Integer, TestingSourceSplit> fetcher = SplitFetcherManagerTest.createFetcher("testSplit", reader, configuration);
        reader.awaitAllRecordsReturned();
        SplitFetcherManagerTest.drainQueue(fetcher.getQueue());
        Assertions.assertThat((boolean)fetcher.getQueue().getAvailabilityFuture().isDone()).isFalse();
        reader.triggerThrowException();
        fetcher.getQueue().getAvailabilityFuture().get();
        try {
            fetcher.checkErrors();
            Assertions.fail((String)"expected exception");
        }
        catch (Exception e) {
            Assertions.assertThat((Throwable)e.getCause().getCause()).isSameAs((Object)testingException);
        }
        finally {
            fetcher.close(20000L);
        }
    }

    private static <E> SplitFetcherManager<E, TestingSourceSplit> createFetcher(String splitId, SplitReader<E, TestingSourceSplit> reader, Configuration configuration) {
        SingleThreadFetcherManager fetcher = new SingleThreadFetcherManager(() -> reader, configuration);
        fetcher.addSplits(Collections.singletonList(new TestingSourceSplit(splitId)));
        return fetcher;
    }

    private static void drainQueue(FutureCompletingBlockingQueue<?> queue) {
        while (queue.poll() != null) {
        }
    }

    private static List<Thread> findThread(String keyword) {
        ArrayList<Thread> threads = new ArrayList<Thread>();
        for (Thread t : Thread.getAllStackTraces().keySet()) {
            if (!t.getName().contains(keyword)) continue;
            threads.add(t);
        }
        return threads;
    }

    private static final class AwaitingReader<E, SplitT extends SourceSplit>
    implements SplitReader<E, SplitT> {
        private final Queue<RecordsWithSplitIds<E>> fetches;
        private final IOException testError;
        private final OneShotLatch inBlocking = new OneShotLatch();
        private final OneShotLatch throwError = new OneShotLatch();
        private final OneShotLatch closeBlocker = new OneShotLatch();
        private volatile boolean isClosed = false;

        @SafeVarargs
        AwaitingReader(IOException testError, RecordsWithSplitIds<E> ... fetches) {
            this.testError = testError;
            this.fetches = new ArrayDeque<RecordsWithSplitIds<RecordsWithSplitIds<E>>>(Arrays.asList(fetches));
            this.closeBlocker.trigger();
        }

        AwaitingReader() {
            this.testError = new IOException("DummyException");
            this.fetches = new ArrayDeque(Collections.emptyList());
        }

        public RecordsWithSplitIds<E> fetch() throws IOException {
            if (!this.fetches.isEmpty()) {
                return this.fetches.poll();
            }
            this.inBlocking.trigger();
            try {
                this.throwError.await();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("interrupted");
            }
            throw this.testError;
        }

        public void handleSplitsChanges(SplitsChange<SplitT> splitsChanges) {
        }

        public void wakeUp() {
        }

        public void close() throws Exception {
            this.closeBlocker.await();
            this.isClosed = true;
        }

        public void awaitAllRecordsReturned() throws InterruptedException {
            this.inBlocking.await();
        }

        public void triggerThrowException() {
            this.throwError.trigger();
        }

        public void triggerClose() {
            this.closeBlocker.trigger();
        }
    }
}

