/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.asyncprocessing.operators;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.declare.ContextVariable;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException;
import org.apache.flink.runtime.asyncprocessing.declare.NamedCallback;
import org.apache.flink.runtime.asyncprocessing.declare.NamedConsumer;
import org.apache.flink.runtime.asyncprocessing.declare.NamedFunction;
import org.apache.flink.runtime.asyncprocessing.functions.DeclaringAsyncKeyedProcessFunction;
import org.apache.flink.runtime.asyncprocessing.operators.AsyncKeyedProcessOperator;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.apache.flink.util.function.FunctionWithException;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class AsyncKeyedProcessOperatorTest {
    @ParameterizedTest(name="Chain declaration = {0}")
    @ValueSource(booleans={false, true})
    public void testNormalProcessor(boolean chained) throws Exception {
        TestDeclarationFunctionBase function = chained ? new TestChainDeclarationFunction() : new TestNormalDeclarationFunction();
        AsyncKeyedProcessOperator testOperator = new AsyncKeyedProcessOperator((KeyedProcessFunction)function);
        ArrayList<StreamRecord> expectedOutput = new ArrayList<StreamRecord>();
        try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = AsyncKeyedOneInputStreamOperatorTestHarness.create((OneInputStreamOperator)testOperator, (KeySelector & Serializable)e -> (Integer)e.f0, (TypeInformation)TypeInformation.of(Integer.class));){
            testHarness.open();
            testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            expectedOutput.add(new StreamRecord((Object)"11"));
            Assertions.assertThat((int)function.getValue()).isEqualTo(11);
            testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)6, (Object)"6")));
            expectedOutput.add(new StreamRecord((Object)"24"));
            Assertions.assertThat((int)function.getValue()).isEqualTo(24);
            Assertions.assertThat((Collection)testHarness.getOutput()).containsExactly(expectedOutput.toArray());
        }
    }

    @Test
    public void testTimerProcessor() throws Exception {
        TestTimerDeclarationFunction function = new TestTimerDeclarationFunction();
        AsyncKeyedProcessOperator testOperator = new AsyncKeyedProcessOperator((KeyedProcessFunction)function);
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = AsyncKeyedOneInputStreamOperatorTestHarness.create((OneInputStreamOperator)testOperator, (KeySelector & Serializable)e -> (Integer)e.f0, (TypeInformation)TypeInformation.of(Integer.class));){
            testHarness.open();
            testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)6, (Object)"5")));
            Assertions.assertThat((int)function.getValue()).isEqualTo(0);
            testHarness.processWatermark(5L);
            expectedOutput.add(new StreamRecord((Object)"11", 5L));
            expectedOutput.add(new Watermark(5L));
            Assertions.assertThat((int)function.getValue()).isEqualTo(11);
            testHarness.processWatermark(6L);
            expectedOutput.add(new StreamRecord((Object)"24", 6L));
            expectedOutput.add(new Watermark(6L));
            Assertions.assertThat((int)function.getValue()).isEqualTo(24);
            Assertions.assertThat((Collection)testHarness.getOutput()).containsExactly(expectedOutput.toArray());
        }
    }

    @Test
    public void testNoDeclaredFunction() throws Exception {
        TestNotDeclarationFunction function = new TestNotDeclarationFunction();
        AsyncKeyedProcessOperator testOperator = new AsyncKeyedProcessOperator((KeyedProcessFunction)function);
        ArrayList<Object> expectedOutput = new ArrayList<Object>();
        try (AsyncKeyedOneInputStreamOperatorTestHarness testHarness = AsyncKeyedOneInputStreamOperatorTestHarness.create((OneInputStreamOperator)testOperator, (KeySelector & Serializable)e -> (Integer)e.f0, (TypeInformation)TypeInformation.of(Integer.class));){
            testHarness.open();
            testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)5, (Object)"5")));
            testHarness.processElement(new StreamRecord((Object)Tuple2.of((Object)6, (Object)"5")));
            Assertions.assertThat((int)function.getValue()).isEqualTo(0);
            testHarness.processWatermark(5L);
            expectedOutput.add(new StreamRecord((Object)"11", 5L));
            expectedOutput.add(new Watermark(5L));
            Assertions.assertThat((int)function.getValue()).isEqualTo(11);
            testHarness.processWatermark(6L);
            expectedOutput.add(new StreamRecord((Object)"24", 6L));
            expectedOutput.add(new Watermark(6L));
            Assertions.assertThat((int)function.getValue()).isEqualTo(24);
            Assertions.assertThat((Collection)testHarness.getOutput()).containsExactly(expectedOutput.toArray());
        }
    }

    private static class TestNotDeclarationFunction
    extends KeyedProcessFunction<Integer, Tuple2<Integer, String>, String> {
        final AtomicInteger value = new AtomicInteger(0);

        private TestNotDeclarationFunction() {
        }

        public int getValue() {
            return this.value.get();
        }

        public void processElement(Tuple2<Integer, String> e, KeyedProcessFunction.Context ctx, Collector<String> out) throws Exception {
            ctx.timerService().registerEventTimeTimer((long)((Integer)e.f0).intValue());
        }

        public void onTimer(long timestamp, KeyedProcessFunction.OnTimerContext ctx, Collector<String> out) throws DeclarationException {
            this.value.addAndGet((int)timestamp);
            this.value.incrementAndGet();
            this.value.addAndGet((int)timestamp);
            out.collect((Object)String.valueOf(this.value.get()));
        }
    }

    private static class TestTimerDeclarationFunction
    extends TestDeclarationFunctionBase {
        private TestTimerDeclarationFunction() {
        }

        public ThrowingConsumer<Tuple2<Integer, String>, Exception> declareProcess(DeclarationContext context, KeyedProcessFunction.Context ctx, Collector<String> out) throws DeclarationException {
            return context.declareChain().thenCompose(e -> {
                ctx.timerService().registerEventTimeTimer((long)((Integer)e.f0).intValue());
                return StateFutureUtils.completedVoidFuture();
            }).finish();
        }

        public ThrowingConsumer<Long, Exception> declareOnTimer(DeclarationContext context, KeyedProcessFunction.OnTimerContext ctx, Collector<String> out) throws DeclarationException {
            ContextVariable inputValue = context.declareVariable(null);
            return context.declareChain().thenCompose(e -> {
                if (inputValue.get() == null) {
                    inputValue.set((Object)e.intValue());
                }
                this.value.addAndGet(e.intValue());
                return StateFutureUtils.completedVoidFuture();
            }).thenCompose(v -> StateFutureUtils.completedFuture((Object)this.value.incrementAndGet())).withName("adder").thenAccept(v -> {
                this.value.addAndGet((Integer)inputValue.get());
                out.collect((Object)String.valueOf(this.value.get()));
            }).withName("doubler").finish();
        }
    }

    private static class TestChainDeclarationFunction
    extends TestDeclarationFunctionBase {
        private TestChainDeclarationFunction() {
        }

        public ThrowingConsumer<Tuple2<Integer, String>, Exception> declareProcess(DeclarationContext context, KeyedProcessFunction.Context ctx, Collector<String> out) throws DeclarationException {
            ContextVariable inputValue = context.declareVariable(null);
            return context.declareChain().thenCompose(e -> {
                if (inputValue.get() == null) {
                    inputValue.set((Object)((Integer)e.f0));
                }
                this.value.addAndGet((Integer)e.f0);
                return StateFutureUtils.completedVoidFuture();
            }).thenCompose(v -> StateFutureUtils.completedFuture((Object)this.value.incrementAndGet())).withName("adder").thenAccept(v -> {
                this.value.addAndGet((Integer)inputValue.get());
                out.collect((Object)String.valueOf(this.value.get()));
            }).withName("doubler").finish();
        }
    }

    private static class TestNormalDeclarationFunction
    extends TestDeclarationFunctionBase {
        private TestNormalDeclarationFunction() {
        }

        public ThrowingConsumer<Tuple2<Integer, String>, Exception> declareProcess(DeclarationContext context, KeyedProcessFunction.Context ctx, Collector<String> out) throws DeclarationException {
            ContextVariable inputValue = context.declareVariable(null);
            NamedFunction adder = context.declare("adder", i -> StateFutureUtils.completedFuture((Object)this.value.incrementAndGet()));
            NamedConsumer doubler = context.declare("doubler", v -> {
                this.value.addAndGet((Integer)inputValue.get());
                out.collect((Object)String.valueOf(this.value.get()));
            });
            Assertions.assertThat((Object)adder).isInstanceOf(NamedCallback.class);
            Assertions.assertThat((Object)doubler).isInstanceOf(NamedCallback.class);
            return e -> {
                if (inputValue.get() == null) {
                    inputValue.set((Object)((Integer)e.f0));
                }
                this.value.addAndGet((Integer)e.f0);
                StateFutureUtils.completedVoidFuture().thenCompose((FunctionWithException)adder).thenAccept((ThrowingConsumer)doubler);
            };
        }
    }

    private static abstract class TestDeclarationFunctionBase
    extends DeclaringAsyncKeyedProcessFunction<Integer, Tuple2<Integer, String>, String> {
        final AtomicInteger value = new AtomicInteger(0);

        private TestDeclarationFunctionBase() {
        }

        public int getValue() {
            return this.value.get();
        }
    }
}

