/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.asyncprocessing;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.asyncprocessing.AsyncExecutionController;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.SnapshotType;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendTestUtils;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.streaming.api.operators.AbstractInput;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.AbstractStreamOperatorV2;
import org.apache.flink.streaming.api.operators.Input;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerServiceAsyncImpl;
import org.apache.flink.streaming.api.operators.MultipleInputStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.runtime.io.RecordProcessorUtils;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperatorTest;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.AbstractAsyncStateStreamOperatorV2;
import org.apache.flink.streaming.runtime.operators.asyncprocessing.ElementOrder;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

class AbstractAsyncStateStreamOperatorV2Test {
    AbstractAsyncStateStreamOperatorV2Test() {
    }

    protected KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> createTestHarness(int maxParalelism, int numSubtasks, int subtaskIndex, ElementOrder elementOrder) throws Exception {
        KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = new KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String>((StreamOperatorFactory<String>)new TestOperatorFactory(elementOrder), (KeySelector<Tuple2<Integer, String>, Integer>)new AbstractAsyncStateStreamOperatorTest.TestKeySelector(), (TypeInformation<Integer>)BasicTypeInfo.INT_TYPE_INFO, maxParalelism, numSubtasks, subtaskIndex);
        testHarness.setStateBackend(StateBackendTestUtils.buildAsyncStateBackend((StateBackend)new HashMapStateBackend()));
        return testHarness;
    }

    @Test
    void testCreateAsyncExecutionController() throws Exception {
        try (KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            Assertions.assertThat(testHarness.getBaseOperator()).isInstanceOf(AbstractAsyncStateStreamOperatorV2.class);
            Assertions.assertThat((Object)((AbstractAsyncStateStreamOperatorV2)testHarness.getBaseOperator()).getAsyncExecutionController()).isNotNull();
            Assertions.assertThat((Object)((AbstractAsyncStateStreamOperatorV2)testHarness.getBaseOperator()).getAsyncExecutionController().getStateExecutor()).isNotNull();
        }
    }

    @Test
    void testRecordProcessorWithFirstStateOrder() throws Exception {
        try (KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.FIRST_STATE_ORDER);){
            testHarness.open();
            SingleInputTestOperator testOperator = (SingleInputTestOperator)testHarness.getBaseOperator();
            ThrowingConsumer processor = RecordProcessorUtils.getRecordProcessor((Input)testOperator.getInputs().get(0));
            ExecutorService anotherThread = Executors.newSingleThreadExecutor();
            anotherThread.execute(() -> {
                try {
                    processor.accept((Object)new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(1);
            testOperator.proceed();
            anotherThread.shutdown();
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
        }
    }

    @Test
    void testRecordProcessorWithRecordOrder() throws Exception {
        try (KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            SingleInputTestOperator testOperator = (SingleInputTestOperator)testHarness.getBaseOperator();
            ThrowingConsumer processor = RecordProcessorUtils.getRecordProcessor((Input)testOperator.getInputs().get(0));
            ExecutorService anotherThread = Executors.newSingleThreadExecutor();
            anotherThread.execute(() -> {
                try {
                    processor.accept((Object)new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
                }
                catch (Exception exception) {
                    // empty catch block
                }
            });
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getProcessed()).isEqualTo(1);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isGreaterThan(1);
            testOperator.proceed();
            anotherThread.shutdown();
            Thread.sleep(1000L);
            Assertions.assertThat((int)testOperator.getCurrentProcessingContext().getReferenceCount()).isEqualTo(0);
        }
    }

    @Test
    void testCheckpointDrain() throws Exception {
        try (KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            SingleInputTestOperator testOperator = (SingleInputTestOperator)testHarness.getBaseOperator();
            CheckpointStorageLocationReference locationReference = CheckpointStorageLocationReference.getDefault();
            AsyncExecutionController asyncExecutionController = testOperator.getAsyncExecutionController();
            testOperator.setAsyncKeyedContextElement(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")), new AbstractAsyncStateStreamOperatorTest.TestKeySelector());
            asyncExecutionController.handleRequest(null, StateRequestType.VALUE_GET, null);
            testOperator.postProcessElement();
            Assertions.assertThat((int)asyncExecutionController.getInFlightRecordNum()).isEqualTo(1);
            testOperator.snapshotState(1L, 1L, new CheckpointOptions((SnapshotType)CheckpointType.CHECKPOINT, locationReference), new JobManagerCheckpointStorage().createCheckpointStorage(new JobID()).resolveCheckpointStorageLocation(1L, locationReference));
            Assertions.assertThat((int)asyncExecutionController.getInFlightRecordNum()).isEqualTo(0);
        }
    }

    @Disabled(value="Support Timer for AsyncKeyedStateBackend")
    @Test
    void testTimerServiceIsAsync() throws Exception {
        try (KeyedOneInputStreamOperatorV2TestHarness<Integer, Tuple2<Integer, String>, String> testHarness = this.createTestHarness(128, 1, 0, ElementOrder.RECORD_ORDER);){
            testHarness.open();
            Assertions.assertThat(testHarness.getBaseOperator()).isInstanceOf(AbstractAsyncStateStreamOperatorV2.class);
            Triggerable triggerable = new Triggerable(){

                public void onEventTime(InternalTimer timer) throws Exception {
                }

                public void onProcessingTime(InternalTimer timer) throws Exception {
                }
            };
            Assertions.assertThat((Object)((AbstractAsyncStateStreamOperatorV2)testHarness.getBaseOperator()).getInternalTimerService("test", (TypeSerializer)VoidNamespaceSerializer.INSTANCE, triggerable)).isInstanceOf(InternalTimerServiceAsyncImpl.class);
        }
    }

    private static class SingleInputTestOperator
    extends AbstractAsyncStateStreamOperatorV2<String>
    implements MultipleInputStreamOperator<String>,
    Triggerable<Integer, VoidNamespace> {
        private static final long serialVersionUID = 1L;
        final AtomicInteger processed = new AtomicInteger(0);
        private final ElementOrder elementOrder;
        final Object objectToWait = new Object();
        final Input input;

        public SingleInputTestOperator(StreamOperatorParameters<String> parameters, ElementOrder elementOrder) {
            super(parameters, 1);
            this.elementOrder = elementOrder;
            this.input = new AbstractInput<Tuple2<Integer, String>, String>((AbstractStreamOperatorV2)this, 1){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                public void processElement(StreamRecord<Tuple2<Integer, String>> element) throws Exception {
                    processed.incrementAndGet();
                    Object object = objectToWait;
                    synchronized (object) {
                        objectToWait.wait();
                    }
                }
            };
        }

        public void open() throws Exception {
            super.open();
        }

        public List<Input> getInputs() {
            return Collections.singletonList(this.input);
        }

        public ElementOrder getElementOrder() {
            return this.elementOrder;
        }

        public void onEventTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
        }

        public void onProcessingTime(InternalTimer<Integer, VoidNamespace> timer) throws Exception {
        }

        public int getProcessed() {
            return this.processed.get();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void proceed() {
            Object object = this.objectToWait;
            synchronized (object) {
                this.objectToWait.notify();
            }
        }
    }

    private static class TestOperatorFactory
    extends AbstractStreamOperatorFactory<String> {
        private final ElementOrder elementOrder;

        TestOperatorFactory(ElementOrder elementOrder) {
            this.elementOrder = elementOrder;
        }

        public <T extends StreamOperator<String>> T createStreamOperator(StreamOperatorParameters<String> parameters) {
            return (T)((Object)new SingleInputTestOperator(parameters, this.elementOrder));
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return SingleInputTestOperator.class;
        }
    }

    private static class KeyedOneInputStreamOperatorV2TestHarness<K, IN, OUT>
    extends KeyedOneInputStreamOperatorTestHarness<K, IN, OUT> {
        public KeyedOneInputStreamOperatorV2TestHarness(StreamOperatorFactory<OUT> operatorFactory, KeySelector<IN, K> keySelector, TypeInformation<K> keyType, int maxParallelism, int numSubtasks, int subtaskIndex) throws Exception {
            super(operatorFactory, keySelector, keyType, maxParallelism, numSubtasks, subtaskIndex);
        }

        public StreamOperator<OUT> getBaseOperator() {
            return this.operator;
        }
    }
}

