/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cep;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.EventComparator;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternFlatTimeoutFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.RichPatternFlatSelectFunction;
import org.apache.flink.cep.RichPatternSelectFunction;
import org.apache.flink.cep.SubEvent;
import org.apache.flink.cep.configuration.CEPCacheOptions;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.WithinType;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Either;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class CEPITCase
extends AbstractTestBase {
    @Parameterized.Parameter
    public Configuration envConfiguration;

    @Parameterized.Parameters
    public static Collection<Configuration> prepareSharedBufferCacheConfig() {
        Configuration miniCacheConfig = new Configuration();
        miniCacheConfig.set(CEPCacheOptions.CEP_CACHE_STATISTICS_INTERVAL, (Object)Duration.ofSeconds(1L));
        miniCacheConfig.set(CEPCacheOptions.CEP_SHARED_BUFFER_ENTRY_CACHE_SLOTS, (Object)1);
        miniCacheConfig.set(CEPCacheOptions.CEP_SHARED_BUFFER_EVENT_CACHE_SLOTS, (Object)1);
        Configuration bigCacheConfig = new Configuration();
        miniCacheConfig.set(CEPCacheOptions.CEP_CACHE_STATISTICS_INTERVAL, (Object)Duration.ofSeconds(1L));
        return Arrays.asList(miniCacheConfig, bigCacheConfig);
    }

    @Test
    public void testSimplePatternCEP() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        DataStreamSource input = env.fromElements((Object[])new Event[]{new Event(1, "barfoo", 1.0), new Event(2, "start", 2.0), new Event(3, "foobar", 3.0), new SubEvent(4, "foo", 4.0, 1.0), new Event(5, "middle", 5.0), new SubEvent(6, "middle", 6.0, 2.0), new SubEvent(7, "bar", 3.0, 3.0), new Event(42, "42", 42.0), new Event(8, "end", 1.0)});
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start"))).followedByAny("middle").subtype(SubEvent.class).where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("middle"))).followedByAny("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end")));
        SingleOutputStreamOperator result = CEP.pattern((DataStream)input, (Pattern)pattern).inProcessingTime().flatSelect((PatternFlatSelectFunction & Serializable)(p, o) -> {
            StringBuilder builder = new StringBuilder();
            builder.append(((Event)((List)p.get("start")).get(0)).getId()).append(",").append(((Event)((List)p.get("middle")).get(0)).getId()).append(",").append(((Event)((List)p.get("end")).get(0)).getId());
            o.collect((Object)builder.toString());
        }, Types.STRING);
        ArrayList resultList = new ArrayList();
        DataStreamUtils.collect((DataStream)result).forEachRemaining(resultList::add);
        Assert.assertEquals(Arrays.asList("2,6,8"), resultList);
    }

    @Test
    public void testSimpleKeyedPatternCEP() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        env.setParallelism(2);
        KeyedStream input = env.fromElements((Object[])new Event[]{new Event(1, "barfoo", 1.0), new Event(2, "start", 2.0), new Event(3, "start", 2.1), new Event(3, "foobar", 3.0), new SubEvent(4, "foo", 4.0, 1.0), new SubEvent(3, "middle", 3.2, 1.0), new Event(42, "start", 3.1), new SubEvent(42, "middle", 3.3, 1.2), new Event(5, "middle", 5.0), new SubEvent(2, "middle", 6.0, 2.0), new SubEvent(7, "bar", 3.0, 3.0), new Event(42, "42", 42.0), new Event(3, "end", 2.0), new Event(2, "end", 1.0), new Event(42, "end", 42.0)}).keyBy((KeySelector)new KeySelector<Event, Integer>(){

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        });
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start"))).followedByAny("middle").subtype(SubEvent.class).where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("middle"))).followedByAny("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end")));
        SingleOutputStreamOperator result = CEP.pattern((DataStream)input, (Pattern)pattern).inProcessingTime().select((PatternSelectFunction & Serializable)p -> {
            StringBuilder builder = new StringBuilder();
            builder.append(((Event)((List)p.get("start")).get(0)).getId()).append(",").append(((Event)((List)p.get("middle")).get(0)).getId()).append(",").append(((Event)((List)p.get("end")).get(0)).getId());
            return builder.toString();
        });
        ArrayList resultList = new ArrayList();
        DataStreamUtils.collect((DataStream)result).forEachRemaining(resultList::add);
        resultList.sort(String::compareTo);
        Assert.assertEquals(Arrays.asList("2,2,2", "3,3,3", "42,42,42"), resultList);
    }

    @Test
    public void testSimplePatternEventTime() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        SingleOutputStreamOperator input = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)new Event(1, "start", 1.0), (Object)5L), Tuple2.of((Object)new Event(2, "middle", 2.0), (Object)1L), Tuple2.of((Object)new Event(3, "end", 3.0), (Object)3L), Tuple2.of((Object)new Event(4, "end", 4.0), (Object)10L), Tuple2.of((Object)new Event(5, "middle", 5.0), (Object)7L), Tuple2.of((Object)new Event(5, "middle", 5.0), (Object)100L)}).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>(){

            public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
                return (Long)element.f1;
            }

            public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
                return new Watermark((Long)lastElement.f1 - 5L);
            }
        }).map((MapFunction)new MapFunction<Tuple2<Event, Long>, Event>(){

            public Event map(Tuple2<Event, Long> value) throws Exception {
                return (Event)value.f0;
            }
        });
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start"))).followedByAny("middle").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("middle"))).followedByAny("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end")));
        SingleOutputStreamOperator result = CEP.pattern((DataStream)input, (Pattern)pattern).select((PatternSelectFunction)new PatternSelectFunction<Event, String>(){

            public String select(Map<String, List<Event>> pattern) {
                StringBuilder builder = new StringBuilder();
                builder.append(pattern.get("start").get(0).getId()).append(",").append(pattern.get("middle").get(0).getId()).append(",").append(pattern.get("end").get(0).getId());
                return builder.toString();
            }
        });
        ArrayList resultList = new ArrayList();
        DataStreamUtils.collect((DataStream)result).forEachRemaining(resultList::add);
        resultList.sort(String::compareTo);
        Assert.assertEquals(Arrays.asList("1,5,4"), resultList);
    }

    @Test
    public void testSimpleKeyedPatternEventTime() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        env.setParallelism(2);
        KeyedStream input = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)new Event(1, "start", 1.0), (Object)5L), Tuple2.of((Object)new Event(1, "middle", 2.0), (Object)1L), Tuple2.of((Object)new Event(2, "middle", 2.0), (Object)4L), Tuple2.of((Object)new Event(2, "start", 2.0), (Object)3L), Tuple2.of((Object)new Event(1, "end", 3.0), (Object)3L), Tuple2.of((Object)new Event(3, "start", 4.1), (Object)5L), Tuple2.of((Object)new Event(1, "end", 4.0), (Object)10L), Tuple2.of((Object)new Event(2, "end", 2.0), (Object)8L), Tuple2.of((Object)new Event(1, "middle", 5.0), (Object)7L), Tuple2.of((Object)new Event(3, "middle", 6.0), (Object)9L), Tuple2.of((Object)new Event(3, "end", 7.0), (Object)7L)}).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>(){

            public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp) {
                return (Long)element.f1;
            }

            public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
                return new Watermark((Long)lastElement.f1 - 5L);
            }
        }).map((MapFunction)new MapFunction<Tuple2<Event, Long>, Event>(){

            public Event map(Tuple2<Event, Long> value) throws Exception {
                return (Event)value.f0;
            }
        }).keyBy((KeySelector)new KeySelector<Event, Integer>(){

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        });
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start"))).followedByAny("middle").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("middle"))).followedByAny("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end")));
        SingleOutputStreamOperator result = CEP.pattern((DataStream)input, (Pattern)pattern).select((PatternSelectFunction)new PatternSelectFunction<Event, String>(){

            public String select(Map<String, List<Event>> pattern) {
                StringBuilder builder = new StringBuilder();
                builder.append(pattern.get("start").get(0).getId()).append(",").append(pattern.get("middle").get(0).getId()).append(",").append(pattern.get("end").get(0).getId());
                return builder.toString();
            }
        });
        ArrayList resultList = new ArrayList();
        DataStreamUtils.collect((DataStream)result).forEachRemaining(resultList::add);
        resultList.sort(String::compareTo);
        Assert.assertEquals(Arrays.asList("1,1,1", "2,2,2"), resultList);
    }

    @Test
    public void testSimplePatternWithSingleState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        DataStreamSource input = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)0, (Object)1), new Tuple2((Object)0, (Object)2)});
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)rec -> (Integer)rec.f1 == 1));
        PatternStream pStream = CEP.pattern((DataStream)input, (Pattern)pattern).inProcessingTime();
        SingleOutputStreamOperator result = pStream.select((PatternSelectFunction)new PatternSelectFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>(){

            public Tuple2<Integer, Integer> select(Map<String, List<Tuple2<Integer, Integer>>> pattern) throws Exception {
                return pattern.get("start").get(0);
            }
        });
        ArrayList resultList = new ArrayList();
        DataStreamUtils.collect((DataStream)result).forEachRemaining(resultList::add);
        Assert.assertEquals(Arrays.asList(new Tuple2((Object)0, (Object)1)), resultList);
    }

    @Test
    public void testProcessingTimeWithinBetweenFirstAndLast() throws Exception {
        this.testProcessingTimeWithWindow(WithinType.FIRST_AND_LAST);
    }

    @Test
    public void testProcessingTimeWithinPreviousAndCurrent() throws Exception {
        this.testProcessingTimeWithWindow(WithinType.PREVIOUS_AND_CURRENT);
    }

    private void testProcessingTimeWithWindow(WithinType withinType) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        env.setParallelism(1);
        DataStreamSource input = env.fromElements((Object[])new Integer[]{1, 2});
        Pattern pattern = Pattern.begin((String)"start").followedByAny("end").within(Time.days((long)1L), withinType);
        SingleOutputStreamOperator result = CEP.pattern((DataStream)input, (Pattern)pattern).inProcessingTime().select((PatternSelectFunction)new PatternSelectFunction<Integer, Integer>(){

            public Integer select(Map<String, List<Integer>> pattern) throws Exception {
                return pattern.get("start").get(0) + pattern.get("end").get(0);
            }
        });
        ArrayList resultList = new ArrayList();
        DataStreamUtils.collect((DataStream)result).forEachRemaining(resultList::add);
        Assert.assertEquals(Arrays.asList(3), resultList);
    }

    @Test
    public void testTimeoutHandlingWithinFirstAndLast() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        env.setParallelism(1);
        SingleOutputStreamOperator input = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)new Event(1, "start", 1.0), (Object)1L), Tuple2.of((Object)new Event(1, "middle", 2.0), (Object)5L), Tuple2.of((Object)new Event(1, "start", 2.0), (Object)4L), Tuple2.of((Object)new Event(1, "end", 2.0), (Object)6L)}).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>(){

            public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp) {
                return (Long)element.f1;
            }

            public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
                return new Watermark((Long)lastElement.f1 - 5L);
            }
        }).map((MapFunction)new MapFunction<Tuple2<Event, Long>, Event>(){

            public Event map(Tuple2<Event, Long> value) throws Exception {
                return (Event)value.f0;
            }
        });
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start"))).followedByAny("middle").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("middle"))).followedByAny("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end"))).within(Time.milliseconds((long)3L));
        SingleOutputStreamOperator result = CEP.pattern((DataStream)input, (Pattern)pattern).select((PatternTimeoutFunction)new PatternTimeoutFunction<Event, String>(){

            public String timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) throws Exception {
                return pattern.get("start").get(0).getPrice() + "";
            }
        }, (PatternSelectFunction)new PatternSelectFunction<Event, String>(){

            public String select(Map<String, List<Event>> pattern) {
                StringBuilder builder = new StringBuilder();
                builder.append(pattern.get("start").get(0).getPrice()).append(",").append(pattern.get("middle").get(0).getPrice()).append(",").append(pattern.get("end").get(0).getPrice());
                return builder.toString();
            }
        });
        ArrayList<Either> resultList = new ArrayList<Either>();
        DataStreamUtils.collect((DataStream)result).forEachRemaining(resultList::add);
        resultList.sort(Comparator.comparing(Object::toString));
        List<Either> expected = Arrays.asList(Either.Left.of((Object)"1.0"), Either.Left.of((Object)"2.0"), Either.Left.of((Object)"2.0"), Either.Right.of((Object)"2.0,2.0,2.0"));
        Assert.assertEquals(expected, resultList);
    }

    @Test
    public void testTimeoutHandlingWithinPreviousAndCurrent() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        env.setParallelism(1);
        SingleOutputStreamOperator input = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)new Event(1, "start", 1.0), (Object)1L), Tuple2.of((Object)new Event(1, "middle", 2.0), (Object)5L), Tuple2.of((Object)new Event(1, "start", 2.0), (Object)4L), Tuple2.of((Object)new Event(1, "end", 2.0), (Object)6L)}).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>(){

            public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp) {
                return (Long)element.f1;
            }

            public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
                return new Watermark((Long)lastElement.f1 - 5L);
            }
        }).map((MapFunction)new MapFunction<Tuple2<Event, Long>, Event>(){

            public Event map(Tuple2<Event, Long> value) throws Exception {
                return (Event)value.f0;
            }
        });
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start"))).followedByAny("middle").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("middle"))).followedByAny("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end"))).within(Time.milliseconds((long)3L), WithinType.PREVIOUS_AND_CURRENT);
        SingleOutputStreamOperator result = CEP.pattern((DataStream)input, (Pattern)pattern).select((PatternTimeoutFunction)new PatternTimeoutFunction<Event, String>(){

            public String timeout(Map<String, List<Event>> pattern, long timeoutTimestamp) throws Exception {
                return pattern.get("start").get(0).getPrice() + "";
            }
        }, (PatternSelectFunction)new PatternSelectFunction<Event, String>(){

            public String select(Map<String, List<Event>> pattern) {
                StringBuilder builder = new StringBuilder();
                builder.append(pattern.get("start").get(0).getPrice()).append(",").append(pattern.get("middle").get(0).getPrice()).append(",").append(pattern.get("end").get(0).getPrice());
                return builder.toString();
            }
        });
        ArrayList<Either> resultList = new ArrayList<Either>();
        DataStreamUtils.collect((DataStream)result).forEachRemaining(resultList::add);
        resultList.sort(Comparator.comparing(Object::toString));
        List<Either> expected = Arrays.asList(Either.Left.of((Object)"1.0"), Either.Left.of((Object)"2.0"), Either.Right.of((Object)"1.0,2.0,2.0"), Either.Right.of((Object)"2.0,2.0,2.0"));
        Assert.assertEquals(expected, resultList);
    }

    @Test
    public void testSimpleOrFilterPatternCEP() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        DataStreamSource input = env.fromElements((Object[])new Event[]{new Event(1, "start", 1.0), new Event(2, "middle", 2.0), new Event(3, "end", 3.0), new Event(4, "start", 4.0), new Event(5, "middle", 5.0), new Event(6, "end", 6.0)});
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start"))).followedByAny("middle").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getPrice() == 2.0)).or((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getPrice() == 5.0)).followedByAny("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end")));
        SingleOutputStreamOperator result = CEP.pattern((DataStream)input, (Pattern)pattern).inProcessingTime().select((PatternSelectFunction)new PatternSelectFunction<Event, String>(){

            public String select(Map<String, List<Event>> pattern) {
                StringBuilder builder = new StringBuilder();
                builder.append(pattern.get("start").get(0).getId()).append(",").append(pattern.get("middle").get(0).getId()).append(",").append(pattern.get("end").get(0).getId());
                return builder.toString();
            }
        });
        ArrayList resultList = new ArrayList();
        DataStreamUtils.collect((DataStream)result).forEachRemaining(resultList::add);
        List<String> expected = Arrays.asList("1,5,6", "1,2,3", "4,5,6", "1,2,6");
        expected.sort(String::compareTo);
        resultList.sort(String::compareTo);
        Assert.assertEquals(expected, resultList);
    }

    @Test
    public void testSimplePatternEventTimeWithComparator() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        SingleOutputStreamOperator input = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)new Event(1, "start", 1.0), (Object)5L), Tuple2.of((Object)new Event(2, "middle", 2.0), (Object)1L), Tuple2.of((Object)new Event(3, "end", 3.0), (Object)3L), Tuple2.of((Object)new Event(4, "end", 4.0), (Object)10L), Tuple2.of((Object)new Event(5, "middle", 6.0), (Object)7L), Tuple2.of((Object)new Event(6, "middle", 5.0), (Object)7L), Tuple2.of((Object)new Event(7, "middle", 5.0), (Object)100L)}).assignTimestampsAndWatermarks((AssignerWithPunctuatedWatermarks)new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>(){

            public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
                return (Long)element.f1;
            }

            public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
                return new Watermark((Long)lastElement.f1 - 5L);
            }
        }).map((MapFunction)new MapFunction<Tuple2<Event, Long>, Event>(){

            public Event map(Tuple2<Event, Long> value) throws Exception {
                return (Event)value.f0;
            }
        });
        CustomEventComparator comparator = new CustomEventComparator();
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start"))).followedByAny("middle").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("middle"))).followedByAny("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end")));
        SingleOutputStreamOperator result = CEP.pattern((DataStream)input, (Pattern)pattern, (EventComparator)comparator).select((PatternSelectFunction)new PatternSelectFunction<Event, String>(){

            public String select(Map<String, List<Event>> pattern) {
                StringBuilder builder = new StringBuilder();
                builder.append(pattern.get("start").get(0).getId()).append(",").append(pattern.get("middle").get(0).getId()).append(",").append(pattern.get("end").get(0).getId());
                return builder.toString();
            }
        });
        ArrayList resultList = new ArrayList();
        DataStreamUtils.collect((DataStream)result).forEachRemaining(resultList::add);
        List<String> expected = Arrays.asList("1,6,4", "1,5,4");
        expected.sort(String::compareTo);
        resultList.sort(String::compareTo);
        Assert.assertEquals(expected, resultList);
    }

    @Test
    public void testSimpleAfterMatchSkip() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        DataStreamSource input = env.fromElements((Object[])new Tuple2[]{new Tuple2((Object)1, (Object)"a"), new Tuple2((Object)2, (Object)"a"), new Tuple2((Object)3, (Object)"a"), new Tuple2((Object)4, (Object)"a")});
        Pattern pattern = Pattern.begin((String)"start", (AfterMatchSkipStrategy)AfterMatchSkipStrategy.skipPastLastEvent()).where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)rec -> ((String)rec.f1).equals("a"))).times(2);
        PatternStream pStream = CEP.pattern((DataStream)input, (Pattern)pattern).inProcessingTime();
        SingleOutputStreamOperator result = pStream.select((PatternSelectFunction)new PatternSelectFunction<Tuple2<Integer, String>, Tuple2<Integer, String>>(){

            public Tuple2<Integer, String> select(Map<String, List<Tuple2<Integer, String>>> pattern) throws Exception {
                return pattern.get("start").get(0);
            }
        });
        ArrayList<Tuple2> resultList = new ArrayList<Tuple2>();
        DataStreamUtils.collect((DataStream)result).forEachRemaining(resultList::add);
        resultList.sort(Comparator.comparing(tuple2 -> tuple2.toString()));
        List<Tuple2> expected = Arrays.asList(Tuple2.of((Object)1, (Object)"a"), Tuple2.of((Object)3, (Object)"a"));
        Assert.assertEquals(expected, resultList);
    }

    @Test
    public void testRichPatternFlatSelectFunction() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        DataStreamSource input = env.fromElements((Object[])new Event[]{new Event(1, "barfoo", 1.0), new Event(2, "start", 2.0), new Event(3, "foobar", 3.0), new SubEvent(4, "foo", 4.0, 1.0), new Event(5, "middle", 5.0), new SubEvent(6, "middle", 6.0, 2.0), new SubEvent(7, "bar", 3.0, 3.0), new Event(42, "42", 42.0), new Event(8, "end", 1.0)});
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)new RichIterativeCondition<Event>(){

            public boolean filter(Event value, IterativeCondition.Context<Event> ctx) throws Exception {
                return value.getName().equals("start");
            }
        }).followedByAny("middle").subtype(SubEvent.class).where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("middle"))).followedByAny("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end")));
        SingleOutputStreamOperator result = CEP.pattern((DataStream)input, (Pattern)pattern).inProcessingTime().flatSelect((PatternFlatSelectFunction)new RichPatternFlatSelectFunction<Event, String>(){

            public void open(Configuration config) {
                try {
                    this.getRuntimeContext().getMapState(new MapStateDescriptor("test", (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)LongSerializer.INSTANCE));
                    throw new RuntimeException("Expected getMapState to fail with unsupported operation exception.");
                }
                catch (UnsupportedOperationException unsupportedOperationException) {
                    this.getRuntimeContext().getUserCodeClassLoader();
                    return;
                }
            }

            public void flatSelect(Map<String, List<Event>> p, Collector<String> o) throws Exception {
                StringBuilder builder = new StringBuilder();
                builder.append(p.get("start").get(0).getId()).append(",").append(p.get("middle").get(0).getId()).append(",").append(p.get("end").get(0).getId());
                o.collect((Object)builder.toString());
            }
        }, Types.STRING);
        ArrayList resultList = new ArrayList();
        DataStreamUtils.collect((DataStream)result).forEachRemaining(resultList::add);
        Assert.assertEquals(Arrays.asList("2,6,8"), resultList);
    }

    @Test
    public void testRichPatternSelectFunction() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        env.setParallelism(2);
        KeyedStream input = env.fromElements((Object[])new Event[]{new Event(1, "barfoo", 1.0), new Event(2, "start", 2.0), new Event(3, "start", 2.1), new Event(3, "foobar", 3.0), new SubEvent(4, "foo", 4.0, 1.0), new SubEvent(3, "middle", 3.2, 1.0), new Event(42, "start", 3.1), new SubEvent(42, "middle", 3.3, 1.2), new Event(5, "middle", 5.0), new SubEvent(2, "middle", 6.0, 2.0), new SubEvent(7, "bar", 3.0, 3.0), new Event(42, "42", 42.0), new Event(3, "end", 2.0), new Event(2, "end", 1.0), new Event(42, "end", 42.0)}).keyBy((KeySelector)new KeySelector<Event, Integer>(){

            public Integer getKey(Event value) throws Exception {
                return value.getId();
            }
        });
        Pattern pattern = Pattern.begin((String)"start").where((IterativeCondition)new RichIterativeCondition<Event>(){

            public boolean filter(Event value, IterativeCondition.Context<Event> ctx) throws Exception {
                return value.getName().equals("start");
            }
        }).followedByAny("middle").subtype(SubEvent.class).where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("middle"))).followedByAny("end").where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("end")));
        SingleOutputStreamOperator result = CEP.pattern((DataStream)input, (Pattern)pattern).inProcessingTime().select((PatternSelectFunction)new RichPatternSelectFunction<Event, String>(){

            public void open(Configuration config) {
                try {
                    this.getRuntimeContext().getMapState(new MapStateDescriptor("test", (TypeSerializer)LongSerializer.INSTANCE, (TypeSerializer)LongSerializer.INSTANCE));
                    throw new RuntimeException("Expected getMapState to fail with unsupported operation exception.");
                }
                catch (UnsupportedOperationException unsupportedOperationException) {
                    this.getRuntimeContext().getUserCodeClassLoader();
                    return;
                }
            }

            public String select(Map<String, List<Event>> p) throws Exception {
                StringBuilder builder = new StringBuilder();
                builder.append(p.get("start").get(0).getId()).append(",").append(p.get("middle").get(0).getId()).append(",").append(p.get("end").get(0).getId());
                return builder.toString();
            }
        });
        ArrayList resultList = new ArrayList();
        DataStreamUtils.collect((DataStream)result).forEachRemaining(resultList::add);
        resultList.sort(String::compareTo);
        Assert.assertEquals(Arrays.asList("2,2,2", "3,3,3", "42,42,42"), resultList);
    }

    @Test
    public void testFlatSelectSerializationWithAnonymousClass() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        DataStreamSource elements = env.fromElements((Object[])new Integer[]{1, 2, 3});
        OutputTag<Integer> outputTag = new OutputTag<Integer>("AAA"){};
        CEP.pattern((DataStream)elements, (Pattern)Pattern.begin((String)"A")).inProcessingTime().flatSelect((OutputTag)outputTag, (PatternFlatTimeoutFunction)new PatternFlatTimeoutFunction<Integer, Integer>(){

            public void timeout(Map<String, List<Integer>> pattern, long timeoutTimestamp, Collector<Integer> out) throws Exception {
            }
        }, (PatternFlatSelectFunction)new PatternFlatSelectFunction<Integer, Object>(){

            public void flatSelect(Map<String, List<Integer>> pattern, Collector<Object> out) throws Exception {
            }
        });
        env.execute();
    }

    @Test
    public void testPartialMatchTimeoutOutputCompletedMatch() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)this.envConfiguration);
        SingleOutputStreamOperator input = env.fromElements((Object[])new Tuple2[]{Tuple2.of((Object)new Event(1, "start", 1.0), (Object)0L), Tuple2.of((Object)new Event(2, "start", 2.0), (Object)1L), Tuple2.of((Object)new Event(3, "start", 3.0), (Object)2L), Tuple2.of((Object)new Event(4, "start", 4.0), (Object)3L), Tuple2.of((Object)new Event(5, "end", 5.0), (Object)4L)}).assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness((Duration)Duration.ofMillis(5L)).withTimestampAssigner(TimestampAssignerSupplier.of((SerializableTimestampAssigner & Serializable)(element, recordTimestamp) -> (Long)element.f1))).map((MapFunction & Serializable)value -> (Event)value.f0);
        Pattern pattern = Pattern.begin((String)"start", (AfterMatchSkipStrategy)AfterMatchSkipStrategy.skipPastLastEvent()).where((IterativeCondition)SimpleCondition.of((FilterFunction & Serializable)value -> value.getName().equals("start"))).oneOrMore().consecutive().greedy().followedBy("middle").where((IterativeCondition)new IterativeCondition<Event>(){

            public boolean filter(Event value, IterativeCondition.Context<Event> ctx) throws Exception {
                int count = 0;
                for (Event ignored : ctx.getEventsForPattern("start")) {
                    ++count;
                }
                if (count > 2) {
                    return value.getName().equals("middle");
                }
                return value.getName().equals("end");
            }
        }).within(Time.milliseconds((long)100L));
        SingleOutputStreamOperator result = CEP.pattern((DataStream)input, (Pattern)pattern).select((PatternSelectFunction & Serializable)pattern1 -> ((Event)((List)pattern1.get("start")).get(0)).getId() + "," + ((Event)((List)pattern1.get("middle")).get(0)).getId());
        ArrayList resultList = new ArrayList();
        try (CloseableIterator iterator = result.executeAndCollect();){
            iterator.forEachRemaining(resultList::add);
        }
        resultList.sort(String::compareTo);
        Assert.assertEquals(Arrays.asList("3,5"), resultList);
    }

    private static class CustomEventComparator
    implements EventComparator<Event> {
        private CustomEventComparator() {
        }

        public int compare(Event o1, Event o2) {
            return Double.compare(o1.getPrice(), o2.getPrice());
        }
    }
}

