/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cep.operator;

import java.io.Serializable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.cep.utils.CepOperatorTestUtilities;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.junit.Assert;
import org.junit.Test;

public class CEPRescalingTest {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCEPFunctionScalingUp() throws Exception {
        int maxParallelism = 10;
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>(){
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        };
        Event startEvent1 = new Event(7, "start", 1.0);
        SubEvent middleEvent1 = new SubEvent(7, "foo", 1.0, 10.0);
        Event endEvent1 = new Event(7, "end", 1.0);
        int keygroup = KeyGroupRangeAssignment.assignToKeyGroup((Object)keySelector.getKey((Object)startEvent1), (int)maxParallelism);
        Assert.assertEquals((long)1L, (long)keygroup);
        Assert.assertEquals((long)0L, (long)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)2, (int)keygroup));
        Event startEvent2 = new Event(10, "start", 1.0);
        SubEvent middleEvent2 = new SubEvent(10, "foo", 1.0, 10.0);
        Event endEvent2 = new Event(10, "end", 1.0);
        keygroup = KeyGroupRangeAssignment.assignToKeyGroup((Object)keySelector.getKey((Object)startEvent2), (int)maxParallelism);
        Assert.assertEquals((long)9L, (long)keygroup);
        Assert.assertEquals((long)1L, (long)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)2, (int)keygroup));
        KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness = null;
        KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness1 = null;
        KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness2 = null;
        try {
            harness = this.getTestHarness(maxParallelism, 1, 0);
            harness.open();
            harness.processElement(new StreamRecord((Object)startEvent1, 1L));
            harness.processElement(new StreamRecord((Object)new Event(7, "foobar", 1.0), 2L));
            harness.processElement(new StreamRecord((Object)startEvent2, 3L));
            harness.processElement(new StreamRecord((Object)middleEvent2, 4L));
            OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
            harness.close();
            OperatorSubtaskState initState1 = AbstractStreamOperatorTestHarness.repartitionOperatorState((OperatorSubtaskState)snapshot, (int)maxParallelism, (int)1, (int)2, (int)0);
            OperatorSubtaskState initState2 = AbstractStreamOperatorTestHarness.repartitionOperatorState((OperatorSubtaskState)snapshot, (int)maxParallelism, (int)1, (int)2, (int)1);
            harness1 = this.getTestHarness(maxParallelism, 2, 0);
            harness1.setup();
            harness1.initializeState(initState1);
            harness1.open();
            harness1.processWatermark(new Watermark(2L));
            harness1.processElement(new StreamRecord((Object)middleEvent1, 3L));
            harness1.processElement(new StreamRecord((Object)endEvent1, 5L));
            harness1.processWatermark(new Watermark(Long.MAX_VALUE));
            Assert.assertEquals((long)3L, (long)harness1.getOutput().size());
            this.verifyWatermark(harness1.getOutput().poll(), 2L);
            this.verifyPattern(harness1.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
            harness2 = this.getTestHarness(maxParallelism, 2, 1);
            harness2.setup();
            harness2.initializeState(initState2);
            harness2.open();
            harness2.processWatermark(new Watermark(2L));
            harness2.processElement(new StreamRecord((Object)endEvent2, 5L));
            harness2.processElement(new StreamRecord((Object)new Event(42, "start", 1.0), 4L));
            harness2.processWatermark(new Watermark(Long.MAX_VALUE));
            Assert.assertEquals((long)3L, (long)harness2.getOutput().size());
            this.verifyWatermark(harness2.getOutput().poll(), 2L);
            this.verifyPattern(harness2.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
        }
        finally {
            CEPRescalingTest.closeSilently(harness);
            CEPRescalingTest.closeSilently(harness1);
            CEPRescalingTest.closeSilently(harness2);
        }
    }

    private static void closeSilently(OneInputStreamOperatorTestHarness<?, ?> harness) {
        if (harness != null) {
            try {
                harness.close();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCEPFunctionScalingDown() throws Exception {
        int maxParallelism = 10;
        KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>(){
            private static final long serialVersionUID = -4873366487571254798L;

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        };
        Event startEvent1 = new Event(7, "start", 1.0);
        SubEvent middleEvent1 = new SubEvent(7, "foo", 1.0, 10.0);
        Event endEvent1 = new Event(7, "end", 1.0);
        int keygroup = KeyGroupRangeAssignment.assignToKeyGroup((Object)keySelector.getKey((Object)startEvent1), (int)maxParallelism);
        Assert.assertEquals((long)1L, (long)keygroup);
        Assert.assertEquals((long)0L, (long)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)3, (int)keygroup));
        Assert.assertEquals((long)0L, (long)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)2, (int)keygroup));
        Event startEvent2 = new Event(45, "start", 1.0);
        SubEvent middleEvent2 = new SubEvent(45, "foo", 1.0, 10.0);
        Event endEvent2 = new Event(45, "end", 1.0);
        keygroup = KeyGroupRangeAssignment.assignToKeyGroup((Object)keySelector.getKey((Object)startEvent2), (int)maxParallelism);
        Assert.assertEquals((long)6L, (long)keygroup);
        Assert.assertEquals((long)1L, (long)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)3, (int)keygroup));
        Assert.assertEquals((long)1L, (long)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)2, (int)keygroup));
        Event startEvent3 = new Event(90, "start", 1.0);
        SubEvent middleEvent3 = new SubEvent(90, "foo", 1.0, 10.0);
        Event endEvent3 = new Event(90, "end", 1.0);
        keygroup = KeyGroupRangeAssignment.assignToKeyGroup((Object)keySelector.getKey((Object)startEvent3), (int)maxParallelism);
        Assert.assertEquals((long)2L, (long)keygroup);
        Assert.assertEquals((long)0L, (long)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)3, (int)keygroup));
        Assert.assertEquals((long)0L, (long)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)2, (int)keygroup));
        Event startEvent4 = new Event(10, "start", 1.0);
        SubEvent middleEvent4 = new SubEvent(10, "foo", 1.0, 10.0);
        Event endEvent4 = new Event(10, "end", 1.0);
        keygroup = KeyGroupRangeAssignment.assignToKeyGroup((Object)keySelector.getKey((Object)startEvent4), (int)maxParallelism);
        Assert.assertEquals((long)9L, (long)keygroup);
        Assert.assertEquals((long)2L, (long)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)3, (int)keygroup));
        Assert.assertEquals((long)1L, (long)KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)2, (int)keygroup));
        KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness1 = this.getTestHarness(maxParallelism, 3, 0);
        harness1.open();
        KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness2 = this.getTestHarness(maxParallelism, 3, 1);
        harness2.open();
        KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness3 = this.getTestHarness(maxParallelism, 3, 2);
        harness3.open();
        KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness4 = null;
        KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness5 = null;
        try {
            harness1.processWatermark(Long.MIN_VALUE);
            harness2.processWatermark(Long.MIN_VALUE);
            harness3.processWatermark(Long.MIN_VALUE);
            harness1.processElement(new StreamRecord((Object)startEvent1, 1L));
            harness1.processElement(new StreamRecord((Object)new Event(7, "foobar", 1.0), 2L));
            harness1.processElement(new StreamRecord((Object)middleEvent1, 3L));
            harness1.processElement(new StreamRecord((Object)endEvent1, 5L));
            harness1.processElement(new StreamRecord((Object)startEvent3, 10L));
            harness1.processElement(new StreamRecord((Object)startEvent1, 10L));
            harness2.processElement(new StreamRecord((Object)startEvent2, 7L));
            harness2.processElement(new StreamRecord((Object)middleEvent2, 8L));
            harness3.processElement(new StreamRecord((Object)startEvent4, 15L));
            harness3.processElement(new StreamRecord((Object)middleEvent4, 16L));
            harness3.processElement(new StreamRecord((Object)endEvent4, 17L));
            Assert.assertEquals((long)1L, (long)harness1.getOutput().size());
            this.verifyWatermark(harness1.getOutput().poll(), Long.MIN_VALUE);
            Assert.assertEquals((long)1L, (long)harness2.getOutput().size());
            this.verifyWatermark(harness2.getOutput().poll(), Long.MIN_VALUE);
            Assert.assertEquals((long)1L, (long)harness3.getOutput().size());
            this.verifyWatermark(harness3.getOutput().poll(), Long.MIN_VALUE);
            OperatorSubtaskState snapshot = AbstractStreamOperatorTestHarness.repackageState((OperatorSubtaskState[])new OperatorSubtaskState[]{harness2.snapshot(0L, 0L), harness1.snapshot(0L, 0L), harness3.snapshot(0L, 0L)});
            OperatorSubtaskState initState1 = AbstractStreamOperatorTestHarness.repartitionOperatorState((OperatorSubtaskState)snapshot, (int)maxParallelism, (int)3, (int)2, (int)0);
            OperatorSubtaskState initState2 = AbstractStreamOperatorTestHarness.repartitionOperatorState((OperatorSubtaskState)snapshot, (int)maxParallelism, (int)3, (int)2, (int)1);
            harness4 = this.getTestHarness(maxParallelism, 2, 0);
            harness4.setup();
            harness4.initializeState(initState1);
            harness4.open();
            harness5 = this.getTestHarness(maxParallelism, 2, 1);
            harness5.setup();
            harness5.initializeState(initState2);
            harness5.open();
            harness5.processElement(new StreamRecord((Object)endEvent2, 11L));
            harness5.processWatermark(new Watermark(12L));
            this.verifyPattern(harness5.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
            this.verifyWatermark(harness5.getOutput().poll(), 12L);
            harness4.processWatermark(new Watermark(12L));
            Assert.assertEquals((long)2L, (long)harness4.getOutput().size());
            this.verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
            this.verifyWatermark(harness4.getOutput().poll(), 12L);
            harness4.processElement(new StreamRecord((Object)middleEvent3, 15L));
            harness4.processElement(new StreamRecord((Object)endEvent3, 16L));
            harness4.processElement(new StreamRecord((Object)middleEvent1, 15L));
            harness4.processElement(new StreamRecord((Object)endEvent1, 16L));
            harness4.processWatermark(new Watermark(Long.MAX_VALUE));
            harness5.processWatermark(new Watermark(Long.MAX_VALUE));
            Assert.assertEquals((long)3L, (long)harness4.getOutput().size());
            ConcurrentLinkedQueue output = harness4.getOutput();
            StreamRecord resultRecord = (StreamRecord)output.peek();
            Assert.assertTrue((boolean)(resultRecord.getValue() instanceof Map));
            Map patternMap = (Map)resultRecord.getValue();
            if (((Event)((List)patternMap.get("start")).get(0)).getId() == 7) {
                this.verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
                this.verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3);
            } else {
                this.verifyPattern(harness4.getOutput().poll(), startEvent3, middleEvent3, endEvent3);
                this.verifyPattern(harness4.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
            }
            Assert.assertEquals((long)2L, (long)harness5.getOutput().size());
            this.verifyPattern(harness5.getOutput().poll(), startEvent4, middleEvent4, endEvent4);
        }
        finally {
            CEPRescalingTest.closeSilently(harness1);
            CEPRescalingTest.closeSilently(harness2);
            CEPRescalingTest.closeSilently(harness3);
            CEPRescalingTest.closeSilently(harness4);
            CEPRescalingTest.closeSilently(harness5);
        }
    }

    private void verifyWatermark(Object outputObject, long timestamp) {
        Assert.assertTrue((boolean)(outputObject instanceof Watermark));
        Assert.assertEquals((long)timestamp, (long)((Watermark)outputObject).getTimestamp());
    }

    private void verifyPattern(Object outputObject, Event start, SubEvent middle, Event end) {
        Assert.assertTrue((boolean)(outputObject instanceof StreamRecord));
        StreamRecord resultRecord = (StreamRecord)outputObject;
        Assert.assertTrue((boolean)(resultRecord.getValue() instanceof Map));
        Map patternMap = (Map)resultRecord.getValue();
        Assert.assertEquals((Object)start, ((List)patternMap.get("start")).get(0));
        Assert.assertEquals((Object)middle, ((List)patternMap.get("middle")).get(0));
        Assert.assertEquals((Object)end, ((List)patternMap.get("end")).get(0));
    }

    private KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> getTestHarness(int maxParallelism, int taskParallelism, int subtaskIdx) throws Exception {
        TestKeySelector keySelector = new TestKeySelector();
        KeyedOneInputStreamOperatorTestHarness harness = new KeyedOneInputStreamOperatorTestHarness(CepOperatorTestUtilities.getKeyedCepOperator(false, new NFAFactory()), (KeySelector)keySelector, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, maxParallelism, taskParallelism, subtaskIdx);
        harness.setStateBackend((StateBackend)new RocksDBStateBackend((AbstractStateBackend)new MemoryStateBackend()));
        return harness;
    }

    private static class TestKeySelector
    implements KeySelector<Event, Integer> {
        private static final long serialVersionUID = -4873366487571254798L;

        private TestKeySelector() {
        }

        public Integer getKey(Event value) throws Exception {
            return value.getId();
        }
    }

    private static class NFAFactory
    implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;
        private final boolean handleTimeout;

        private NFAFactory() {
            this(false);
        }

        private NFAFactory(boolean handleTimeout) {
            this.handleTimeout = handleTimeout;
        }

        public NFA<Event> createNFA() {
            Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start"))).followedBy("middle").subtype(SubEvent.class).where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getVolume() > 5.0)).followedBy("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end"))).within(Time.milliseconds((long)10L));
            return NFACompiler.compileFactory((Pattern)pattern, (boolean)this.handleTimeout).createNFA();
        }
    }
}

