/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.sink.writer;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class AsyncSinkWriterTimeoutTest {
    private final ExecutorService executorService = Executors.newFixedThreadPool(5);
    private final List<Long> destination = new ArrayList<Long>();

    AsyncSinkWriterTimeoutTest() {
    }

    @Test
    void writerShouldNotRetryIfRequestIsProcessedBeforeTimeout() throws Exception {
        TestSinkInitContextAnyThreadMailbox context = new TestSinkInitContextAnyThreadMailbox();
        TestProcessingTimeService tpts = context.getTestProcessingTimeService();
        TimeoutWriter writer = new TimeoutWriter(context, 1, 10L, 100L, false);
        tpts.setCurrentTime(0L);
        writer.write("1", null);
        tpts.setCurrentTime(10L);
        writer.deliverMessage();
        tpts.setCurrentTime(120L);
        writer.flush(false);
        Assertions.assertThat(this.destination).containsExactly((Object[])new Long[]{1L});
    }

    @Test
    void writerShouldRetryOnTimeoutIfFailOnErrorIsFalse() throws Exception {
        TestSinkInitContextAnyThreadMailbox context = new TestSinkInitContextAnyThreadMailbox();
        TestProcessingTimeService tpts = context.getTestProcessingTimeService();
        TimeoutWriter writer = new TimeoutWriter(context, 1, 10L, 100L, false);
        tpts.setCurrentTime(0L);
        writer.write("1", null);
        tpts.setCurrentTime(110L);
        writer.deliverMessage();
        writer.flush(false);
        Assertions.assertThat(this.destination).containsExactly((Object[])new Long[]{1L, 1L});
    }

    @Test
    void writerShouldFailOnTimeoutIfFailOnErrorIsTrue() throws Exception {
        TestSinkInitContextAnyThreadMailbox context = new TestSinkInitContextAnyThreadMailbox();
        TestProcessingTimeService tpts = context.getTestProcessingTimeService();
        TimeoutWriter writer = new TimeoutWriter(context, 1, 10L, 100L, true);
        tpts.setCurrentTime(0L);
        writer.write("1", null);
        tpts.setCurrentTime(110L);
        writer.deliverMessage();
        Assertions.assertThatExceptionOfType(FlinkRuntimeException.class).isThrownBy(() -> writer.flush(false)).withCauseInstanceOf(TimeoutException.class).havingCause().withMessageContaining("Request timed out after 100ms with failOnTimeout set to true.");
    }

    @Test
    void writerShouldDiscardRetriedEntriesOnTimeout() throws Exception {
        TestSinkInitContextAnyThreadMailbox context = new TestSinkInitContextAnyThreadMailbox();
        TestProcessingTimeService tpts = context.getTestProcessingTimeService();
        TimeoutWriter writer = new TimeoutWriter(context, 1, 10L, 100L, false);
        writer.setShouldFailRequest(true);
        tpts.setCurrentTime(0L);
        writer.write("1", null);
        tpts.setCurrentTime(110L);
        writer.deliverMessage();
        writer.flush(false);
        Assertions.assertThat(this.destination).containsExactly((Object[])new Long[]{1L});
    }

    @Test
    void writerShouldFailOnFatalError() throws Exception {
        TestSinkInitContextAnyThreadMailbox context = new TestSinkInitContextAnyThreadMailbox();
        TestProcessingTimeService tpts = context.getTestProcessingTimeService();
        TimeoutWriter writer = new TimeoutWriter(context, 1, 10L, 100L, true);
        tpts.setCurrentTime(0L);
        writer.setFatalError((Exception)new FlinkRuntimeException("Fatal error"));
        writer.write("1", null);
        writer.deliverMessage();
        Assertions.assertThatExceptionOfType(FlinkRuntimeException.class).isThrownBy(() -> writer.flush(false)).withMessage("Fatal error");
    }

    private class TimeoutWriter
    extends AsyncSinkWriter<String, Long> {
        private Exception fatalError;
        private final CountDownLatch completionLatch;
        private Future<?> submitFuture;
        private boolean shouldFailRequest;

        public TimeoutWriter(WriterInitContext writerInitContext, int maxBatchSize, long maximumTimeInBufferMs, long requestTimeout, boolean failOnTimeout) {
            super((ElementConverter & Serializable)(element, context) -> Long.parseLong(element), writerInitContext, AsyncSinkWriterConfiguration.builder().setMaxBatchSize(maxBatchSize).setMaxBatchSizeInBytes(Long.MAX_VALUE).setMaxInFlightRequests(Integer.MAX_VALUE).setMaxBufferedRequests(Integer.MAX_VALUE).setMaxTimeInBufferMS(maximumTimeInBufferMs).setMaxRecordSizeInBytes(Long.MAX_VALUE).setRequestTimeoutMS(requestTimeout).setFailOnTimeout(failOnTimeout).build(), Collections.emptyList());
            this.shouldFailRequest = false;
            this.completionLatch = new CountDownLatch(1);
        }

        protected void submitRequestEntries(List<Long> requestEntries, ResultHandler<Long> resultHandler) {
            this.submitFuture = AsyncSinkWriterTimeoutTest.this.executorService.submit(() -> {
                while (this.completionLatch.getCount() > 0L) {
                    try {
                        this.completionLatch.await();
                    }
                    catch (InterruptedException e) {
                        org.junit.jupiter.api.Assertions.fail((String)"Submission thread must not be interrupted.");
                    }
                }
                this.submitRequestEntriesSync(requestEntries, resultHandler);
            });
        }

        private void submitRequestEntriesSync(List<Long> requestEntries, ResultHandler<Long> resultHandler) {
            if (this.fatalError != null) {
                resultHandler.completeExceptionally(this.fatalError);
            } else if (this.shouldFailRequest) {
                this.shouldFailRequest = false;
                resultHandler.retryForEntries(requestEntries);
            } else {
                AsyncSinkWriterTimeoutTest.this.destination.addAll(requestEntries);
                resultHandler.complete();
            }
        }

        protected long getSizeInBytes(Long requestEntry) {
            return 8L;
        }

        public void setFatalError(Exception fatalError) {
            this.fatalError = fatalError;
        }

        public void setShouldFailRequest(boolean shouldFailRequest) {
            this.shouldFailRequest = shouldFailRequest;
        }

        public void deliverMessage() throws InterruptedException, ExecutionException {
            this.completionLatch.countDown();
            this.submitFuture.get();
        }
    }
}

