/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.lifecycle.graph;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.runtime.operators.lifecycle.TestJobWithDescription;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommandDispatcher;
import org.apache.flink.runtime.operators.lifecycle.event.TestEventQueue;
import org.apache.flink.runtime.operators.lifecycle.graph.MultiInputTestOperatorFactory;
import org.apache.flink.runtime.operators.lifecycle.graph.OneInputTestStreamOperatorFactory;
import org.apache.flink.runtime.operators.lifecycle.graph.TestDataElement;
import org.apache.flink.runtime.operators.lifecycle.graph.TestEventSource;
import org.apache.flink.runtime.operators.lifecycle.graph.TwoInputTestStreamOperator;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.MultipleConnectedStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.transformations.AbstractMultipleInputTransformation;
import org.apache.flink.streaming.api.transformations.MultipleInputTransformation;
import org.apache.flink.testutils.junit.SharedObjects;
import org.apache.flink.util.function.ThrowingConsumer;

public class TestJobBuilders {
    public static final TestingGraphBuilder SIMPLE_GRAPH_BUILDER = new TestingGraphBuilder(){

        @Override
        public TestJobWithDescription build(SharedObjects shared, ThrowingConsumer<Configuration, Exception> confConsumer, ThrowingConsumer<StreamExecutionEnvironment, Exception> envConsumer) throws Exception {
            TestEventQueue eventQueue = TestEventQueue.createShared(shared);
            TestCommandDispatcher commandQueue = TestCommandDispatcher.createShared(shared);
            StreamExecutionEnvironment env = TestJobBuilders.prepareEnv((ThrowingConsumer<Configuration, Exception>)confConsumer, (ThrowingConsumer<StreamExecutionEnvironment, Exception>)envConsumer);
            String unitedSourceLeft = "00000000000000000000000000000001";
            String mapForward = "00000000000000000000000000000005";
            SingleOutputStreamOperator src = env.addSource((SourceFunction)new TestEventSource(unitedSourceLeft, eventQueue, commandQueue)).setUidHash(unitedSourceLeft).assignTimestampsAndWatermarks(TestJobBuilders.createWmAssigner());
            SingleOutputStreamOperator forwardTransform = src.transform("transform-1-forward", TypeInformation.of(TestDataElement.class), (OneInputStreamOperatorFactory)new OneInputTestStreamOperatorFactory(mapForward, eventQueue, commandQueue)).setUidHash(mapForward);
            forwardTransform.sinkTo((Sink)new DiscardingSink());
            HashMap<String, Integer> operatorsNumberOfInputs = new HashMap<String, Integer>();
            operatorsNumberOfInputs.put(mapForward, 1);
            return new TestJobWithDescription(env.getStreamGraph().getJobGraph(), Collections.singleton(unitedSourceLeft), new HashSet<String>(Collections.singletonList(mapForward)), new HashSet<String>(Arrays.asList(unitedSourceLeft, mapForward)), operatorsNumberOfInputs, eventQueue, commandQueue);
        }

        public String toString() {
            return "simple graph";
        }
    };
    public static final TestingGraphBuilder COMPLEX_GRAPH_BUILDER = new TestingGraphBuilder(){

        @Override
        public TestJobWithDescription build(SharedObjects shared, ThrowingConsumer<Configuration, Exception> confConsumer, ThrowingConsumer<StreamExecutionEnvironment, Exception> envConsumer) throws Exception {
            TestEventQueue eventQueue = TestEventQueue.createShared(shared);
            TestCommandDispatcher commandQueue = TestCommandDispatcher.createShared(shared);
            StreamExecutionEnvironment env = TestJobBuilders.prepareEnv((ThrowingConsumer<Configuration, Exception>)confConsumer, (ThrowingConsumer<StreamExecutionEnvironment, Exception>)envConsumer);
            String unitedSourceLeft = "00000000000000000000000000000001";
            String unitedSourceRight = "00000000000000000000000000000002";
            String connectedSource = "00000000000000000000000000000003";
            String multiSource = "00000000000000000000000000000004";
            String mapForward = "00000000000000000000000000000005";
            String mapKeyed = "00000000000000000000000000000006";
            String mapTwoInput = "00000000000000000000000000000007";
            String multipleInput = "00000000000000000000000000000008";
            DataStream unitedSources = env.addSource((SourceFunction)new TestEventSource(unitedSourceLeft, eventQueue, commandQueue)).setUidHash(unitedSourceLeft).assignTimestampsAndWatermarks(TestJobBuilders.createWmAssigner()).union(new DataStream[]{env.addSource((SourceFunction)new TestEventSource(unitedSourceRight, eventQueue, commandQueue)).setUidHash(unitedSourceRight).assignTimestampsAndWatermarks(TestJobBuilders.createWmAssigner())});
            SingleOutputStreamOperator sideSource = env.addSource((SourceFunction)new TestEventSource(multiSource, eventQueue, commandQueue)).setUidHash(multiSource).assignTimestampsAndWatermarks(TestJobBuilders.createWmAssigner());
            DataStream[] inputs = new DataStream[]{unitedSources, sideSource};
            MultipleInputTransformation multipleInputsTransform = new MultipleInputTransformation("MultipleInputOperator", (StreamOperatorFactory)new MultiInputTestOperatorFactory(inputs.length, eventQueue, multipleInput), TypeInformation.of(TestDataElement.class), env.getParallelism());
            for (DataStream input : inputs) {
                multipleInputsTransform.addInput(input.getTransformation());
            }
            multipleInputsTransform.setChainingStrategy(ChainingStrategy.HEAD_WITH_SOURCES);
            env.addOperator((Transformation)multipleInputsTransform);
            SingleOutputStreamOperator multipleSources = new MultipleConnectedStreams(env).transform((AbstractMultipleInputTransformation)multipleInputsTransform).setUidHash(multiSource);
            SingleOutputStreamOperator forwardTransform = multipleSources.startNewChain().transform("transform-1-forward", TypeInformation.of(TestDataElement.class), (OneInputStreamOperatorFactory)new OneInputTestStreamOperatorFactory(mapForward, eventQueue, commandQueue)).setUidHash(mapForward);
            SingleOutputStreamOperator keyedTransform = forwardTransform.startNewChain().keyBy((KeySelector & Serializable)e -> e.seq % 1000L).transform("transform-2-keyed", TypeInformation.of(TestDataElement.class), (OneInputStreamOperatorFactory)new OneInputTestStreamOperatorFactory(mapKeyed, eventQueue, commandQueue)).setUidHash(mapKeyed);
            SingleOutputStreamOperator twoInputTransform = keyedTransform.startNewChain().connect((DataStream)env.addSource((SourceFunction)new TestEventSource(connectedSource, eventQueue, commandQueue)).setUidHash(connectedSource)).transform("transform-3-two-input", TypeInformation.of(TestDataElement.class), (TwoInputStreamOperator)new TwoInputTestStreamOperator(mapTwoInput, eventQueue)).setUidHash(mapTwoInput);
            twoInputTransform.sinkTo((Sink)new DiscardingSink());
            HashMap<String, Integer> operatorsNumberOfInputs = new HashMap<String, Integer>();
            operatorsNumberOfInputs.put(mapForward, 1);
            operatorsNumberOfInputs.put(mapKeyed, 1);
            operatorsNumberOfInputs.put(mapTwoInput, 2);
            operatorsNumberOfInputs.put(multipleInput, 2);
            return new TestJobWithDescription(env.getStreamGraph().getJobGraph(), new HashSet<String>(Arrays.asList(unitedSourceLeft, unitedSourceRight, connectedSource)), new HashSet<String>(Arrays.asList(mapForward, mapKeyed, mapTwoInput, multipleInput)), new HashSet<String>(Arrays.asList(unitedSourceLeft, unitedSourceRight, connectedSource, mapForward, mapKeyed, mapTwoInput, multipleInput)), operatorsNumberOfInputs, eventQueue, commandQueue);
        }

        public String toString() {
            return "complex graph";
        }
    };
    private static final String OP_ID_HASH_PREFIX = "0000000000000000000000000000000";

    private TestJobBuilders() {
    }

    private static StreamExecutionEnvironment prepareEnv(ThrowingConsumer<Configuration, Exception> confConsumer, ThrowingConsumer<StreamExecutionEnvironment, Exception> envConsumer) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY, (Object)"full");
        confConsumer.accept((Object)configuration);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        env.setParallelism(4);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.enableCheckpointing(200L);
        env.getCheckpointConfig().setCheckpointingConsistencyMode(CheckpointingMode.EXACTLY_ONCE);
        env.getConfig().setAutoWatermarkInterval(50L);
        envConsumer.accept((Object)env);
        return env;
    }

    private static WatermarkStrategy<TestDataElement> createWmAssigner() {
        return WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new WatermarkGenerator<TestDataElement>(){
            private Watermark watermark = new Watermark(Long.MIN_VALUE);

            public void onEvent(TestDataElement event, long eventTimestamp, WatermarkOutput output) {
                this.watermark = new Watermark(eventTimestamp);
            }

            public void onPeriodicEmit(WatermarkOutput output) {
                output.emitWatermark(this.watermark);
            }
        }).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(element, recordTimestamp) -> element.seq);
    }

    @FunctionalInterface
    public static interface TestingGraphBuilder {
        public TestJobWithDescription build(SharedObjects var1, ThrowingConsumer<Configuration, Exception> var2, ThrowingConsumer<StreamExecutionEnvironment, Exception> var3) throws Exception;
    }
}

