/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cep.operator;

import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.cep.Event;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.nfa.NFA;
import org.apache.flink.cep.nfa.compiler.NFACompiler;
import org.apache.flink.cep.operator.CepOperator;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.utils.CepOperatorTestUtilities;
import org.apache.flink.cep.utils.EventBuilder;
import org.apache.flink.cep.utils.OutputAsserter;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.flink.util.TestLogger;
import org.junit.Test;

public class CepProcessFunctionContextTest
extends TestLogger {
    private static final boolean PROCESSING_TIME = true;
    private static final boolean EVENT_TIME = false;
    private static final String NO_TIMESTAMP = "(NO_TIMESTAMP)";

    @Test
    public void testTimestampPassingInEventTime() throws Exception {
        try (OneInputStreamOperatorTestHarness<Event, String> harness = CepOperatorTestUtilities.getCepTestHarness(this.createCepOperator(CepProcessFunctionContextTest.extractTimestampAndNames(1), new NFAForwardingFactory(), false));){
            harness.open();
            harness.processElement(EventBuilder.event().withName("A").withTimestamp(5L).asStreamRecord());
            harness.processElement(EventBuilder.event().withName("B").withTimestamp(3L).asStreamRecord());
            harness.processWatermark(6L);
            OutputAsserter.assertOutput(harness.getOutput()).nextElementEquals("3:B").nextElementEquals("5:A").watermarkEquals(6L).hasNoMoreElements();
        }
    }

    @Test
    public void testTimestampPassingInProcessingTime() throws Exception {
        try (OneInputStreamOperatorTestHarness<Event, String> harness = CepOperatorTestUtilities.getCepTestHarness(this.createCepOperator(CepProcessFunctionContextTest.extractTimestampAndNames(1), new NFAForwardingFactory(), true));){
            harness.open();
            harness.setProcessingTime(1L);
            harness.processElement(EventBuilder.event().withName("A").withTimestamp(5L).asStreamRecord());
            harness.setProcessingTime(2L);
            harness.processElement(EventBuilder.event().withName("B").withTimestamp(3L).asStreamRecord());
            harness.setProcessingTime(3L);
            OutputAsserter.assertOutput(harness.getOutput()).nextElementEquals("1:A").nextElementEquals("2:B").hasNoMoreElements();
        }
    }

    @Test
    public void testCurrentProcessingTimeInProcessingTime() throws Exception {
        try (OneInputStreamOperatorTestHarness<Event, String> harness = CepOperatorTestUtilities.getCepTestHarness(this.createCepOperator(CepProcessFunctionContextTest.extractCurrentProcessingTimeAndNames(1), new NFAForwardingFactory(), true));){
            harness.open();
            harness.setProcessingTime(15L);
            harness.processElement(EventBuilder.event().withName("A").asStreamRecord());
            harness.setProcessingTime(35L);
            harness.processElement(EventBuilder.event().withName("B").asStreamRecord());
            OutputAsserter.assertOutput(harness.getOutput()).nextElementEquals("15:A").nextElementEquals("35:B").hasNoMoreElements();
        }
    }

    @Test
    public void testCurrentProcessingTimeInEventTime() throws Exception {
        try (OneInputStreamOperatorTestHarness<Event, String> harness = CepOperatorTestUtilities.getCepTestHarness(this.createCepOperator(CepProcessFunctionContextTest.extractCurrentProcessingTimeAndNames(1), new NFAForwardingFactory(), false));){
            harness.open();
            harness.setProcessingTime(10L);
            harness.processElement(EventBuilder.event().withName("A").withTimestamp(5L).asStreamRecord());
            harness.setProcessingTime(100L);
            harness.processWatermark(6L);
            OutputAsserter.assertOutput(harness.getOutput()).nextElementEquals("100:A").watermarkEquals(6L).hasNoMoreElements();
        }
    }

    @Test
    public void testTimestampPassingForTimedOutInEventTime() throws Exception {
        OutputTag<String> timedOut = new OutputTag<String>("timedOut"){};
        try (OneInputStreamOperatorTestHarness<Event, String> harness = CepOperatorTestUtilities.getCepTestHarness(this.createCepOperator(CepProcessFunctionContextTest.extractTimestampAndNames(2, timedOut), new NFATimingOutFactory(), false));){
            harness.open();
            harness.processElement(EventBuilder.event().withName("A").withTimestamp(5L).asStreamRecord());
            harness.processElement(EventBuilder.event().withName("C").withTimestamp(20L).asStreamRecord());
            harness.processElement(EventBuilder.event().withName("B").withTimestamp(3L).asStreamRecord());
            harness.processWatermark(22L);
            OutputAsserter.assertOutput(harness.getOutput()).nextElementEquals("5:B:A").watermarkEquals(22L).hasNoMoreElements();
            OutputAsserter.assertOutput(harness.getSideOutput((OutputTag)timedOut)).nextElementEquals("15:A").hasNoMoreElements();
        }
    }

    @Test
    public void testTimestampPassingForTimedOutInProcessingTime() throws Exception {
        OutputTag<String> timedOut = new OutputTag<String>("timedOut"){};
        try (OneInputStreamOperatorTestHarness<Event, String> harness = CepOperatorTestUtilities.getCepTestHarness(this.createCepOperator(CepProcessFunctionContextTest.extractTimestampAndNames(2, timedOut), new NFATimingOutFactory(), true));){
            harness.open();
            harness.setProcessingTime(3L);
            harness.processElement(EventBuilder.event().withName("A").withTimestamp(3L).asStreamRecord());
            harness.setProcessingTime(5L);
            harness.processElement(EventBuilder.event().withName("C").withTimestamp(5L).asStreamRecord());
            harness.setProcessingTime(20L);
            harness.processElement(EventBuilder.event().withName("B").withTimestamp(20L).asStreamRecord());
            OutputAsserter.assertOutput(harness.getOutput()).nextElementEquals("5:A:C").hasNoMoreElements();
            OutputAsserter.assertOutput(harness.getSideOutput((OutputTag)timedOut)).nextElementEquals("15:C").hasNoMoreElements();
        }
    }

    @Test
    public void testCurrentProcessingTimeForTimedOutInEventTime() throws Exception {
        OutputTag<String> sideOutputTag = new OutputTag<String>("timedOut"){};
        try (OneInputStreamOperatorTestHarness<Event, String> harness = CepOperatorTestUtilities.getCepTestHarness(this.createCepOperator(CepProcessFunctionContextTest.extractCurrentProcessingTimeAndNames(2, sideOutputTag), new NFATimingOutFactory(), false));){
            harness.open();
            harness.processElement(EventBuilder.event().withName("A").withTimestamp(5L).asStreamRecord());
            harness.processElement(EventBuilder.event().withName("B").withTimestamp(20L).asStreamRecord());
            harness.processElement(EventBuilder.event().withName("C").withTimestamp(3L).asStreamRecord());
            harness.setProcessingTime(100L);
            harness.processWatermark(22L);
            OutputAsserter.assertOutput(harness.getOutput()).nextElementEquals("100:C:A").watermarkEquals(22L).hasNoMoreElements();
            OutputAsserter.assertOutput(harness.getSideOutput((OutputTag)sideOutputTag)).nextElementEquals("100:A").hasNoMoreElements();
        }
    }

    @Test
    public void testCurrentProcessingTimeForTimedOutInProcessingTime() throws Exception {
        OutputTag<String> sideOutputTag = new OutputTag<String>("timedOut"){};
        try (OneInputStreamOperatorTestHarness<Event, String> harness = CepOperatorTestUtilities.getCepTestHarness(this.createCepOperator(CepProcessFunctionContextTest.extractCurrentProcessingTimeAndNames(2, sideOutputTag), new NFATimingOutFactory(), true));){
            harness.open();
            harness.setProcessingTime(3L);
            harness.processElement(EventBuilder.event().withName("A").asStreamRecord());
            harness.setProcessingTime(5L);
            harness.processElement(EventBuilder.event().withName("B").asStreamRecord());
            harness.setProcessingTime(20L);
            harness.processElement(EventBuilder.event().withName("C").asStreamRecord());
            OutputAsserter.assertOutput(harness.getOutput()).nextElementEquals("5:A:B").hasNoMoreElements();
            OutputAsserter.assertOutput(harness.getSideOutput((OutputTag)sideOutputTag)).nextElementEquals("20:B").hasNoMoreElements();
        }
    }

    private <T> CepOperator<Event, Integer, T> createCepOperator(PatternProcessFunction<Event, T> processFunction, NFACompiler.NFAFactory<Event> nfaFactory, boolean isProcessingTime) throws Exception {
        return new CepOperator(Event.createTypeSerializer(), isProcessingTime, nfaFactory, null, null, processFunction, null);
    }

    private static PatternProcessFunction<Event, String> extractTimestampAndNames(int stateNumber) {
        return new AccessContextWithNames(stateNumber, context -> String.valueOf(context.timestamp()));
    }

    private static PatternProcessFunction<Event, String> extractTimestampAndNames(int stateNumber, OutputTag<String> timedOutTag) {
        return new AccessContextWithNamesWithTimedOut(stateNumber, timedOutTag, context -> String.valueOf(context.timestamp()));
    }

    private static PatternProcessFunction<Event, String> extractCurrentProcessingTimeAndNames(int stateNumber) {
        return new AccessContextWithNames(stateNumber, context -> String.valueOf(context.currentProcessingTime()));
    }

    private static PatternProcessFunction<Event, String> extractCurrentProcessingTimeAndNames(int stateNumber, OutputTag<String> timedOutTag) {
        return new AccessContextWithNamesWithTimedOut(stateNumber, timedOutTag, context -> String.valueOf(context.currentProcessingTime()));
    }

    private static class NFATimingOutFactory
    implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;

        private NFATimingOutFactory() {
        }

        public NFA<Event> createNFA() {
            Pattern pattern = Pattern.begin((String)"1").next("2").within(Time.milliseconds((long)10L));
            return NFACompiler.compileFactory((Pattern)pattern, (boolean)true).createNFA();
        }
    }

    private static class NFAForwardingFactory
    implements NFACompiler.NFAFactory<Event> {
        private static final long serialVersionUID = 1173020762472766713L;

        private NFAForwardingFactory() {
        }

        public NFA<Event> createNFA() {
            Pattern pattern = Pattern.begin((String)"1");
            return NFACompiler.compileFactory((Pattern)pattern, (boolean)false).createNFA();
        }
    }

    static final class AccessContextWithNamesWithTimedOut
    extends AccessContextWithNames
    implements TimedOutPartialMatchHandler<Event> {
        private OutputTag<String> outputTag;

        AccessContextWithNamesWithTimedOut(int stateCount, OutputTag<String> outputTag, Function<PatternProcessFunction.Context, String> contextAccessor) {
            super(stateCount, contextAccessor);
            this.outputTag = outputTag;
        }

        public void processTimedOutMatch(Map<String, List<Event>> match, PatternProcessFunction.Context ctx) throws Exception {
            ctx.output(this.outputTag, (Object)this.extractResult(match, ctx));
        }
    }

    static class AccessContextWithNames
    extends PatternProcessFunction<Event, String> {
        private final int stateCount;
        private final Function<PatternProcessFunction.Context, String> contextAccessor;

        AccessContextWithNames(int stateCount, Function<PatternProcessFunction.Context, String> contextAccessor) {
            this.stateCount = stateCount;
            this.contextAccessor = contextAccessor;
        }

        public void processMatch(Map<String, List<Event>> match, PatternProcessFunction.Context ctx, Collector<String> out) throws Exception {
            out.collect((Object)this.extractResult(match, ctx));
        }

        String extractResult(Map<String, List<Event>> match, PatternProcessFunction.Context ctx) {
            StringBuilder stringBuilder = new StringBuilder(this.contextAccessor.apply(ctx));
            for (int i = 1; i <= this.stateCount; ++i) {
                List<Event> events = match.get("" + i);
                if (events == null) continue;
                stringBuilder.append(":").append(events.get(0).getName());
            }
            return stringBuilder.toString();
        }
    }
}

