/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.asyncprocessing.operators;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.state.v2.ValueState;
import org.apache.flink.api.common.state.v2.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.declare.ContextVariable;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException;
import org.apache.flink.runtime.asyncprocessing.functions.DeclaringAsyncKeyedCoProcessFunction;
import org.apache.flink.runtime.asyncprocessing.operators.co.AsyncKeyedCoProcessOperator;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.apache.flink.streaming.util.asyncprocessing.AsyncKeyedTwoInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.apache.flink.util.function.ThrowingConsumer;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class AsyncKeyedCoProcessOperatorTest {
    AsyncKeyedCoProcessOperatorTest() {
    }

    @Test
    void testDeclareProcessor() throws Exception {
        TestChainDeclarationFunction function = new TestChainDeclarationFunction();
        AsyncKeyedCoProcessOperator operator = new AsyncKeyedCoProcessOperator((KeyedCoProcessFunction)function);
        AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = AsyncKeyedTwoInputStreamOperatorTestHarness.create((TwoInputStreamOperator)operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        ArrayList<StreamRecord> expectedOutput = new ArrayList<StreamRecord>();
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)5));
        expectedOutput.add(new StreamRecord((Object)"11"));
        Assertions.assertThat((int)function.value.get()).isEqualTo(11);
        testHarness.processElement2(new StreamRecord((Object)"6"));
        expectedOutput.add(new StreamRecord((Object)"6"));
        Assertions.assertThat((int)function.value.get()).isEqualTo(17);
        Assertions.assertThat((Collection)testHarness.getOutput()).containsExactly(expectedOutput.toArray());
    }

    @Test
    void testTimestampAndWatermarkQuerying() throws Exception {
        AsyncKeyedCoProcessOperator operator = new AsyncKeyedCoProcessOperator((KeyedCoProcessFunction)new WatermarkQueryingProcessFunction());
        AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = AsyncKeyedTwoInputStreamOperatorTestHarness.create((TwoInputStreamOperator)operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processWatermark1(new Watermark(17L));
        testHarness.processWatermark2(new Watermark(17L));
        testHarness.processElement1(new StreamRecord((Object)5, 12L));
        testHarness.processWatermark1(new Watermark(42L));
        testHarness.processWatermark2(new Watermark(42L));
        testHarness.processElement2(new StreamRecord((Object)"6", 13L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(17L));
        expectedOutput.add(new StreamRecord((Object)"5WM:17 TS:12", 12L));
        expectedOutput.add(new Watermark(42L));
        expectedOutput.add(new StreamRecord((Object)"6WM:42 TS:13", 13L));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testTimestampAndProcessingTimeQuerying() throws Exception {
        AsyncKeyedCoProcessOperator operator = new AsyncKeyedCoProcessOperator((KeyedCoProcessFunction)new ProcessingTimeQueryingProcessFunction());
        AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = AsyncKeyedTwoInputStreamOperatorTestHarness.create((TwoInputStreamOperator)operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(17L);
        testHarness.processElement1(new StreamRecord((Object)5));
        testHarness.setProcessingTime(42L);
        testHarness.processElement2(new StreamRecord((Object)"6"));
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"5PT:17 TS:null"));
        expectedOutput.add(new StreamRecord((Object)"6PT:42 TS:null"));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testEventTimeTimers() throws Exception {
        AsyncKeyedCoProcessOperator operator = new AsyncKeyedCoProcessOperator((KeyedCoProcessFunction)new EventTimeTriggeringProcessFunction());
        AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = AsyncKeyedTwoInputStreamOperatorTestHarness.create((TwoInputStreamOperator)operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)17, 42L));
        testHarness.processElement2(new StreamRecord((Object)"18", 42L));
        testHarness.processWatermark1(new Watermark(5L));
        testHarness.processWatermark2(new Watermark(5L));
        testHarness.processWatermark1(new Watermark(6L));
        testHarness.processWatermark2(new Watermark(6L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)"INPUT1:17", 42L));
        expectedOutput.add(new StreamRecord((Object)"INPUT2:18", 42L));
        expectedOutput.add(new StreamRecord((Object)"17:1777", 5L));
        expectedOutput.add(new Watermark(5L));
        expectedOutput.add(new StreamRecord((Object)"18:1777", 6L));
        expectedOutput.add(new Watermark(6L));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testProcessingTimeTimers() throws Exception {
        AsyncKeyedCoProcessOperator operator = new AsyncKeyedCoProcessOperator((KeyedCoProcessFunction)new ProcessingTimeTriggeringProcessFunction());
        AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = AsyncKeyedTwoInputStreamOperatorTestHarness.create((TwoInputStreamOperator)operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)17));
        testHarness.processElement2(new StreamRecord((Object)"18"));
        testHarness.setProcessingTime(5L);
        testHarness.setProcessingTime(6L);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"INPUT1:17"));
        expectedOutput.add(new StreamRecord((Object)"INPUT2:18"));
        expectedOutput.add(new StreamRecord((Object)"1777"));
        expectedOutput.add(new StreamRecord((Object)"1777"));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testEventTimeTimerWithState() throws Exception {
        AsyncKeyedCoProcessOperator operator = new AsyncKeyedCoProcessOperator((KeyedCoProcessFunction)new EventTimeTriggeringStatefulProcessFunction());
        AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = AsyncKeyedTwoInputStreamOperatorTestHarness.create((TwoInputStreamOperator)operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processWatermark1(new Watermark(1L));
        testHarness.processWatermark2(new Watermark(1L));
        testHarness.processElement1(new StreamRecord((Object)17, 0L));
        testHarness.processElement1(new StreamRecord((Object)13, 0L));
        testHarness.processWatermark1(new Watermark(2L));
        testHarness.processWatermark2(new Watermark(2L));
        testHarness.processElement1(new StreamRecord((Object)13, 1L));
        testHarness.processElement2(new StreamRecord((Object)"42", 1L));
        testHarness.processWatermark1(new Watermark(6L));
        testHarness.processWatermark2(new Watermark(6L));
        testHarness.processWatermark1(new Watermark(7L));
        testHarness.processWatermark2(new Watermark(7L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new Watermark(1L));
        expectedOutput.add(new StreamRecord((Object)"INPUT1:17", 0L));
        expectedOutput.add(new StreamRecord((Object)"INPUT1:13", 0L));
        expectedOutput.add(new Watermark(2L));
        expectedOutput.add(new StreamRecord((Object)"INPUT2:42", 1L));
        expectedOutput.add(new StreamRecord((Object)"STATE:17", 6L));
        expectedOutput.add(new Watermark(6L));
        expectedOutput.add(new StreamRecord((Object)"STATE:42", 7L));
        expectedOutput.add(new Watermark(7L));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testProcessingTimeTimerWithState() throws Exception {
        AsyncKeyedCoProcessOperator operator = new AsyncKeyedCoProcessOperator((KeyedCoProcessFunction)new ProcessingTimeTriggeringStatefulProcessFunction());
        AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = AsyncKeyedTwoInputStreamOperatorTestHarness.create((TwoInputStreamOperator)operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.setProcessingTime(1L);
        testHarness.processElement1(new StreamRecord((Object)17));
        testHarness.processElement1(new StreamRecord((Object)13));
        testHarness.setProcessingTime(2L);
        testHarness.processElement1(new StreamRecord((Object)13));
        testHarness.processElement2(new StreamRecord((Object)"42"));
        testHarness.setProcessingTime(6L);
        testHarness.setProcessingTime(7L);
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"INPUT1:17"));
        expectedOutput.add(new StreamRecord((Object)"INPUT1:13"));
        expectedOutput.add(new StreamRecord((Object)"INPUT2:42"));
        expectedOutput.add(new StreamRecord((Object)"STATE:17"));
        expectedOutput.add(new StreamRecord((Object)"STATE:42"));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testSnapshotAndRestore() throws Exception {
        AsyncKeyedCoProcessOperator operator = new AsyncKeyedCoProcessOperator((KeyedCoProcessFunction)new BothTriggeringProcessFunction());
        AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = AsyncKeyedTwoInputStreamOperatorTestHarness.create((TwoInputStreamOperator)operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)5, 12L));
        testHarness.processElement2(new StreamRecord((Object)"5", 12L));
        OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0L);
        testHarness.close();
        operator = new AsyncKeyedCoProcessOperator((KeyedCoProcessFunction)new BothTriggeringProcessFunction());
        testHarness = AsyncKeyedTwoInputStreamOperatorTestHarness.create((TwoInputStreamOperator)operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.initializeState(snapshot);
        testHarness.open();
        testHarness.setProcessingTime(5L);
        testHarness.processWatermark1(new Watermark(6L));
        testHarness.processWatermark2(new Watermark(6L));
        ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
        expectedOutput.add(new StreamRecord((Object)"PROC:1777"));
        expectedOutput.add(new StreamRecord((Object)"EVENT:1777", 6L));
        expectedOutput.add(new Watermark(6L));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.close();
    }

    @Test
    void testGetCurrentKeyFromContext() throws Exception {
        AsyncKeyedCoProcessOperator operator = new AsyncKeyedCoProcessOperator((KeyedCoProcessFunction)new AppendCurrentKeyProcessFunction());
        AsyncKeyedTwoInputStreamOperatorTestHarness testHarness = AsyncKeyedTwoInputStreamOperatorTestHarness.create((TwoInputStreamOperator)operator, new IntToStringKeySelector(), new IdentityKeySelector(), (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
        testHarness.setup();
        testHarness.open();
        testHarness.processElement1(new StreamRecord((Object)5));
        testHarness.processElement1(new StreamRecord((Object)6));
        testHarness.processElement2(new StreamRecord((Object)"hello"));
        testHarness.processElement2(new StreamRecord((Object)"world"));
        ConcurrentLinkedQueue<StreamRecord> expectedOutput = new ConcurrentLinkedQueue<StreamRecord>();
        expectedOutput.add(new StreamRecord((Object)"5,5"));
        expectedOutput.add(new StreamRecord((Object)"6,6"));
        expectedOutput.add(new StreamRecord((Object)"hello,hello"));
        expectedOutput.add(new StreamRecord((Object)"world,world"));
        TestHarnessUtil.assertOutputEquals((String)"Output was not correct.", expectedOutput, (Queue)testHarness.getOutput());
        testHarness.close();
    }

    private static class AppendCurrentKeyProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private AppendCurrentKeyProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "," + (String)ctx.getCurrentKey()));
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "," + (String)ctx.getCurrentKey()));
        }
    }

    private static class BothTriggeringProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;

        private BothTriggeringProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            ctx.timerService().registerProcessingTimeTimer(3L);
            ctx.timerService().registerEventTimeTimer(6L);
            ctx.timerService().deleteProcessingTimeTimer(3L);
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            ctx.timerService().registerEventTimeTimer(4L);
            ctx.timerService().registerProcessingTimeTimer(5L);
            ctx.timerService().deleteEventTimeTimer(4L);
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            if (TimeDomain.EVENT_TIME.equals((Object)ctx.timeDomain())) {
                out.collect((Object)"EVENT:1777");
            } else {
                out.collect((Object)"PROC:1777");
            }
        }
    }

    private static class ProcessingTimeTriggeringStatefulProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;
        private final ValueStateDescriptor<String> state = new ValueStateDescriptor("seen-element", (TypeSerializer)StringSerializer.INSTANCE);

        private ProcessingTimeTriggeringStatefulProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            this.handleValue(value, out, ctx.timerService(), 1);
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            this.handleValue(value, out, ctx.timerService(), 2);
        }

        private void handleValue(Object value, Collector<String> out, TimerService timerService, int channel) throws IOException {
            ValueState state = this.getRuntimeContext().getState(this.state);
            state.asyncValue().thenAccept(v -> {
                if (v == null) {
                    state.asyncUpdate((Object)String.valueOf(value)).thenAccept(VO -> out.collect((Object)("INPUT" + channel + ":" + String.valueOf(value))));
                    timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + 5L);
                } else {
                    state.asyncClear();
                    timerService.deleteProcessingTimeTimer(timerService.currentProcessingTime() + 4L);
                }
            });
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Assertions.assertThat((Comparable)ctx.timeDomain()).isEqualTo((Object)TimeDomain.PROCESSING_TIME);
            out.collect((Object)("STATE:" + (String)this.getRuntimeContext().getState(this.state).value()));
        }
    }

    private static class ProcessingTimeQueryingProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;

        private ProcessingTimeQueryingProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()));
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp()));
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
        }
    }

    private static class ProcessingTimeTriggeringProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;

        private ProcessingTimeTriggeringProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)("INPUT1:" + value));
            ctx.timerService().registerProcessingTimeTimer(5L);
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)("INPUT2:" + value));
            ctx.timerService().registerProcessingTimeTimer(6L);
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Assertions.assertThat((Comparable)ctx.timeDomain()).isEqualTo((Object)TimeDomain.PROCESSING_TIME);
            out.collect((Object)"1777");
        }
    }

    private static class EventTimeTriggeringStatefulProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;
        private final ValueStateDescriptor<String> state = new ValueStateDescriptor("seen-element", (TypeSerializer)StringSerializer.INSTANCE);

        private EventTimeTriggeringStatefulProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            this.handleValue(value, out, ctx.timerService(), 1);
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            this.handleValue(value, out, ctx.timerService(), 2);
        }

        private void handleValue(Object value, Collector<String> out, TimerService timerService, int channel) throws IOException {
            ValueState state = this.getRuntimeContext().getState(this.state);
            state.asyncValue().thenAccept(v -> {
                if (v == null) {
                    state.asyncUpdate((Object)String.valueOf(value)).thenAccept(VO -> out.collect((Object)("INPUT" + channel + ":" + String.valueOf(value))));
                    timerService.registerEventTimeTimer(timerService.currentWatermark() + 5L);
                } else {
                    state.asyncClear();
                    timerService.deleteEventTimeTimer(timerService.currentWatermark() + 4L);
                }
            });
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Assertions.assertThat((Comparable)ctx.timeDomain()).isEqualTo((Object)TimeDomain.EVENT_TIME);
            this.getRuntimeContext().getState(this.state).asyncValue().thenAccept(v -> out.collect((Object)("STATE:" + v)));
        }
    }

    private static class EventTimeTriggeringProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;

        private EventTimeTriggeringProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)("INPUT1:" + value));
            ctx.timerService().registerEventTimeTimer(5L);
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)("INPUT2:" + value));
            ctx.timerService().registerEventTimeTimer(6L);
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
            Assertions.assertThat((Comparable)ctx.timeDomain()).isEqualTo((Object)TimeDomain.EVENT_TIME);
            out.collect((Object)((String)ctx.getCurrentKey() + ":1777"));
        }
    }

    private static class WatermarkQueryingProcessFunction
    extends KeyedCoProcessFunction<String, Integer, String, String> {
        private static final long serialVersionUID = 1L;

        private WatermarkQueryingProcessFunction() {
        }

        public void processElement1(Integer value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void processElement2(String value, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp()));
        }

        public void onTimer(long timestamp, KeyedCoProcessFunction.OnTimerContext ctx, Collector<String> out) throws Exception {
        }
    }

    private static class TestChainDeclarationFunction
    extends DeclaringAsyncKeyedCoProcessFunction<String, Integer, String, String> {
        final AtomicInteger value = new AtomicInteger(0);

        private TestChainDeclarationFunction() {
        }

        public ThrowingConsumer<Integer, Exception> declareProcess1(DeclarationContext context, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws DeclarationException {
            ContextVariable inputValue = context.declareVariable(null);
            return context.declareChain().thenCompose(e -> {
                if (inputValue.get() == null) {
                    inputValue.set(e);
                }
                this.value.addAndGet((int)e);
                return StateFutureUtils.completedVoidFuture();
            }).thenCompose(v -> StateFutureUtils.completedFuture((Object)this.value.incrementAndGet())).withName("adder").thenAccept(v -> {
                this.value.addAndGet((Integer)inputValue.get());
                out.collect((Object)String.valueOf(this.value.get()));
            }).withName("doubler").finish();
        }

        public ThrowingConsumer<String, Exception> declareProcess2(DeclarationContext context, KeyedCoProcessFunction.Context ctx, Collector<String> out) throws DeclarationException {
            return context.declareChain().thenAccept(v -> {
                out.collect(v);
                this.value.addAndGet(Integer.valueOf(v));
            }).withName("pass").finish();
        }
    }

    private static class IdentityKeySelector<T>
    implements KeySelector<T, T> {
        private static final long serialVersionUID = 1L;

        private IdentityKeySelector() {
        }

        public T getKey(T value) throws Exception {
            return value;
        }
    }

    private static class IntToStringKeySelector<T>
    implements KeySelector<Integer, String> {
        private static final long serialVersionUID = 1L;

        private IntToStringKeySelector() {
        }

        public String getKey(Integer value) throws Exception {
            return "" + value;
        }
    }
}

