/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.gateway.service.result;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.collections.iterators.IteratorChain;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.results.FetchOrientation;
import org.apache.flink.table.gateway.api.results.ResultSet;
import org.apache.flink.table.gateway.service.result.ResultFetcher;
import org.apache.flink.table.gateway.service.utils.IgnoreExceptionHandler;
import org.apache.flink.table.gateway.service.utils.SqlExecutionException;
import org.apache.flink.table.types.DataType;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ExecutorThreadFactory;
import org.assertj.core.api.AssertionsForClassTypes;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class ResultFetcherTest {
    private static ResolvedSchema schema;
    private static List<RowData> data;
    @RegisterExtension
    private static final TestExecutorExtension<ExecutorService> EXECUTOR_EXTENSION;

    ResultFetcherTest() {
    }

    @BeforeAll
    static void setUp() {
        schema = ResolvedSchema.of((Column[])new Column[]{Column.physical((String)"boolean", (DataType)DataTypes.BOOLEAN()), Column.physical((String)"int", (DataType)DataTypes.INT()), Column.physical((String)"bigint", (DataType)DataTypes.BIGINT()), Column.physical((String)"varchar", (DataType)DataTypes.STRING()), Column.physical((String)"decimal(10, 5)", (DataType)DataTypes.DECIMAL((int)10, (int)5)), Column.physical((String)"timestamp", (DataType)((DataType)DataTypes.TIMESTAMP((int)6).bridgedTo(Timestamp.class))), Column.physical((String)"binary", (DataType)DataTypes.BYTES())});
        data = Arrays.asList(GenericRowData.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{null, 1, 2L, "abc", BigDecimal.valueOf(1.23), Timestamp.valueOf("2020-03-01 18:39:14"), new byte[]{50, 51, 52, -123, 54, 93, 115, 126}}), GenericRowData.ofKind((RowKind)RowKind.UPDATE_BEFORE, (Object[])new Object[]{false, null, 0L, "", BigDecimal.valueOf(1L), Timestamp.valueOf("2020-03-01 18:39:14.1"), new byte[]{100, -98, 32, 121, -125}}), GenericRowData.ofKind((RowKind)RowKind.UPDATE_AFTER, (Object[])new Object[]{true, Integer.MAX_VALUE, null, "abcdefg", BigDecimal.valueOf(12345L), Timestamp.valueOf("2020-03-01 18:39:14.12"), new byte[]{-110, -23, 1, 2}}), GenericRowData.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{false, Integer.MIN_VALUE, Long.MAX_VALUE, null, BigDecimal.valueOf(12345.06789), Timestamp.valueOf("2020-03-01 18:39:14.123"), new byte[]{50, 51, 52, -123, 54, 93, 115, 126}}), GenericRowData.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{true, 100, Long.MIN_VALUE, "abcdefg111", null, Timestamp.valueOf("2020-03-01 18:39:14.123456"), new byte[]{110, 23, -1, -2}}), GenericRowData.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{null, -1, -1L, "abcdefghijklmnopqrstuvwxyz", BigDecimal.valueOf(-12345.06789), null, null}), GenericRowData.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{null, -1, -1L, "\u8fd9\u662f\u4e00\u6bb5\u4e2d\u6587", BigDecimal.valueOf(-12345.06789), Timestamp.valueOf("2020-03-04 18:39:14"), new byte[]{-3, -2, -1, 0, 1, 2, 3}}), GenericRowData.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{null, -1, -1L, "\u3053\u308c\u306f\u65e5\u672c\u8a9e\u3092\u30c6\u30b9\u30c8\u3059\u308b\u305f\u3081\u306e\u6587\u3067\u3059", BigDecimal.valueOf(-12345.06789), Timestamp.valueOf("2020-03-04 18:39:14"), new byte[]{-3, -2, -1, 0, 1, 2, 3}}));
    }

    @Test
    void testFetchResultsMultipleTimesWithLimitedBufferSize() {
        int bufferSize = data.size() / 2;
        ResultFetcher fetcher = this.buildResultFetcher(Collections.singletonList(data.iterator()), bufferSize);
        int fetchSize = data.size();
        this.runFetchMultipleTimes(bufferSize, fetchSize, token -> fetcher.fetchResults(token.longValue(), fetchSize));
    }

    @Test
    void testFetchResultsMultipleTimesWithLimitedFetchSize() {
        int bufferSize = data.size();
        ResultFetcher fetcher = this.buildResultFetcher(Collections.singletonList(data.iterator()), bufferSize);
        int fetchSize = data.size() / 2;
        this.runFetchMultipleTimes(bufferSize, fetchSize, token -> fetcher.fetchResults(token.longValue(), fetchSize));
    }

    @Test
    void testFetchResultsInWithLimitedBufferSizeInOrientation() {
        int bufferSize = data.size() / 2;
        ResultFetcher fetcher = this.buildResultFetcher(Collections.singletonList(data.iterator()), bufferSize);
        int fetchSize = data.size();
        this.runFetchMultipleTimes(bufferSize, fetchSize, token -> fetcher.fetchResults(FetchOrientation.FETCH_NEXT, fetchSize));
    }

    @Test
    void testFetchResultsMultipleTimesWithLimitedFetchSizeInOrientation() {
        int bufferSize = data.size();
        ResultFetcher fetcher = this.buildResultFetcher(Collections.singletonList(data.iterator()), bufferSize);
        int fetchSize = data.size() / 2;
        this.runFetchMultipleTimes(bufferSize, fetchSize, token -> fetcher.fetchResults(FetchOrientation.FETCH_NEXT, fetchSize));
    }

    @Test
    void testFetchResultInParallel() throws Exception {
        ResultFetcher fetcher = this.buildResultFetcher(Collections.singletonList(data.iterator()), data.size() / 2);
        CommonTestUtils.waitUtil(() -> fetcher.getResultStore().getBufferedRecordSize() > 0, (Duration)Duration.ofSeconds(10L), (String)"Failed to wait the buffer has data.");
        this.checkFetchResultInParallel(fetcher);
    }

    @Test
    void testFetchResultInOrientationInParallel() throws Exception {
        List<Iterator<RowData>> dataSuppliers = data.stream().map(row -> new TestIterator(() -> {
            try {
                Thread.sleep(1L);
                return row;
            }
            catch (Exception e) {
                throw new SqlExecutionException("Failed to return the row.", (Throwable)e);
            }
        })).collect(Collectors.toList());
        int fetchThreadNum = 100;
        CountDownLatch latch = new CountDownLatch(fetchThreadNum);
        ResultFetcher fetcher = this.buildResultFetcher(dataSuppliers, 1);
        ConcurrentHashMap rows = new ConcurrentHashMap();
        AtomicReference<Boolean> payloadHasData = new AtomicReference<Boolean>(true);
        for (int i = 0; i < fetchThreadNum; ++i) {
            EXECUTOR_EXTENSION.getExecutor().submit(() -> {
                ResultSet resultSet = fetcher.fetchResults(FetchOrientation.FETCH_NEXT, 1);
                if (resultSet.getResultType().equals((Object)ResultSet.ResultType.PAYLOAD) && resultSet.getData().isEmpty()) {
                    payloadHasData.set(false);
                }
                rows.compute(Thread.currentThread().getId(), (k, v) -> {
                    if (v == null) {
                        return resultSet.getData();
                    }
                    v.addAll(resultSet.getData());
                    return v;
                });
                latch.countDown();
            });
        }
        latch.await();
        Assertions.assertEquals((Object)true, (Object)payloadHasData.get());
        Assertions.assertEquals(new HashSet<RowData>(data), rows.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()));
    }

    @Test
    void testFetchResultFromDummyStoreInParallel() throws Exception {
        this.checkFetchResultInParallel(ResultFetcher.fromResults((OperationHandle)OperationHandle.create(), (ResolvedSchema)schema, data));
    }

    @Test
    void testFetchResultAfterClose() throws Exception {
        ResultFetcher fetcher = this.buildResultFetcher(Collections.singletonList(data.iterator()), data.size() + 1);
        List actual = Collections.emptyList();
        long token = 0L;
        while (actual.size() < 1) {
            ResultSet resultSet = fetcher.fetchResults(token, 1);
            token = (Long)Preconditions.checkNotNull((Object)resultSet.getNextToken());
            actual = resultSet.getData();
        }
        Assertions.assertEquals(data.subList(0, 1), actual);
        fetcher.close();
        long testToken = token;
        AtomicReference<Boolean> meetEnd = new AtomicReference<Boolean>(false);
        EXECUTOR_EXTENSION.getExecutor().submit(() -> {
            ResultSet resultSet;
            long nextToken = testToken;
            while ((resultSet = fetcher.fetchResults(nextToken, Integer.MAX_VALUE)).getResultType() != ResultSet.ResultType.EOS) {
                nextToken = (Long)Preconditions.checkNotNull((Object)resultSet.getNextToken());
            }
            meetEnd.set(true);
        });
        CommonTestUtils.waitUtil(meetEnd::get, (Duration)Duration.ofSeconds(10L), (String)"Should get EOS when fetch results from the closed fetcher.");
    }

    @Test
    void testFetchResultWithToken() {
        ResultFetcher fetcher = this.buildResultFetcher(Collections.singletonList(data.iterator()), data.size());
        Long nextToken = 0L;
        ArrayList actual = new ArrayList();
        ResultSet resultSetBefore = null;
        while (nextToken != null) {
            if (resultSetBefore != null) {
                Assertions.assertEquals(resultSetBefore, (Object)fetcher.fetchResults(nextToken - 1L, data.size()));
            }
            ResultSet resultSet = fetcher.fetchResults(nextToken.longValue(), data.size());
            ResultSet resultSetWithSameToken = fetcher.fetchResults(nextToken.longValue(), data.size());
            Assertions.assertEquals((Object)resultSet, (Object)resultSetWithSameToken);
            if (resultSet.getResultType() == ResultSet.ResultType.EOS) break;
            resultSetBefore = resultSet;
            actual.addAll((Collection)Preconditions.checkNotNull((Object)resultSet.getData()));
            nextToken = resultSet.getNextToken();
        }
        Assertions.assertEquals(data, actual);
    }

    @Test
    void testFetchFailedResult() {
        String message = "Artificial Exception";
        ResultFetcher fetcher = this.buildResultFetcher(Arrays.asList(TestIterator.createErrorIterator(message), data.iterator()), data.size());
        AssertionsForClassTypes.assertThatThrownBy(() -> {
            Long token = 0L;
            while (token != null) {
                token = fetcher.fetchResults(token.longValue(), Integer.MAX_VALUE).getNextToken();
            }
        }).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)message)});
    }

    @Test
    void testFetchIllegalToken() {
        ResultFetcher fetcher = this.buildResultFetcher(Collections.singletonList(data.iterator()), data.size());
        AssertionsForClassTypes.assertThatThrownBy(() -> fetcher.fetchResults(2L, Integer.MAX_VALUE)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)"Expecting token to be 0, but found 2")});
    }

    @Test
    void testFetchBeforeWithDifferentSize() throws Exception {
        ResultFetcher fetcher = this.buildResultFetcher(Collections.singletonList(data.iterator()), data.size() / 2);
        CommonTestUtils.waitUtil(() -> fetcher.getResultStore().getBufferedRecordSize() > 1, (Duration)Duration.ofSeconds(10L), (String)"Failed to make cached records num larger than 1.");
        ResultSet firstFetch = fetcher.fetchResults(0L, Integer.MAX_VALUE);
        int firstFetchSize = firstFetch.getData().size();
        AssertionsForClassTypes.assertThatThrownBy(() -> fetcher.fetchResults(0L, firstFetchSize - 1)).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)String.format("As the same token is provided, fetch size must be not less than the previous returned buffer size. Previous returned result size is %s, current max_fetch_size to be %s.", firstFetch.getData().size(), firstFetchSize - 1))});
    }

    private ResultFetcher buildResultFetcher(List<Iterator<RowData>> rows, int bufferSize) {
        OperationHandle operationHandle = OperationHandle.create();
        return new ResultFetcher(operationHandle, schema, CloseableIterator.adapterForIterator((Iterator)new IteratorChain(rows)), null, false, null, ResultKind.SUCCESS_WITH_CONTENT, bufferSize);
    }

    private void runFetchMultipleTimes(int bufferSize, int fetchSize, Function<Long, ResultSet> fetchResults) {
        ResultSet currentResult;
        ArrayList fetchedRows = new ArrayList();
        Long token = 0L;
        do {
            Assertions.assertTrue((((List)Preconditions.checkNotNull((Object)(currentResult = fetchResults.apply(token)).getData())).size() <= Math.min(bufferSize, fetchSize) ? 1 : 0) != 0);
            token = currentResult.getNextToken();
            fetchedRows.addAll(currentResult.getData());
        } while (currentResult.getResultType() != ResultSet.ResultType.EOS);
        Assertions.assertEquals((Object)ResultSet.ResultType.EOS, (Object)((ResultSet)Preconditions.checkNotNull((Object)currentResult)).getResultType());
        Assertions.assertEquals(data, fetchedRows);
    }

    private void checkFetchResultInParallel(ResultFetcher fetcher) throws Exception {
        AtomicReference<Boolean> isEqual = new AtomicReference<Boolean>(true);
        int fetchThreadNum = 100;
        CountDownLatch latch = new CountDownLatch(fetchThreadNum);
        List firstFetch = fetcher.fetchResults(0L, Integer.MAX_VALUE).getData();
        for (int i = 0; i < fetchThreadNum; ++i) {
            EXECUTOR_EXTENSION.getExecutor().submit(() -> {
                ResultSet resultSet = fetcher.fetchResults(0L, Integer.MAX_VALUE);
                if (!firstFetch.equals(resultSet.getData())) {
                    isEqual.set(false);
                }
                latch.countDown();
            });
        }
        latch.await();
        Assertions.assertEquals((Object)true, (Object)isEqual.get());
    }

    static {
        EXECUTOR_EXTENSION = new TestExecutorExtension(() -> Executors.newCachedThreadPool((ThreadFactory)new ExecutorThreadFactory("Result Fetcher Test Pool", (Thread.UncaughtExceptionHandler)IgnoreExceptionHandler.INSTANCE)));
    }

    private static class TestIterator
    implements Iterator<RowData> {
        private final Supplier<RowData> dataSupplier;
        private boolean hasMoreData;

        public static TestIterator createErrorIterator(String msg) {
            return new TestIterator(() -> {
                throw new SqlExecutionException(msg);
            });
        }

        public TestIterator(Supplier<RowData> dataSupplier) {
            this.dataSupplier = dataSupplier;
            this.hasMoreData = true;
        }

        @Override
        public boolean hasNext() {
            return this.hasMoreData;
        }

        @Override
        public RowData next() {
            this.hasMoreData = false;
            return this.dataSupplier.get();
        }
    }
}

