/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OperatorAttributes;
import org.apache.flink.streaming.api.operators.OperatorAttributesBuilder;
import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperatorFactory;
import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
import org.apache.flink.streaming.api.transformations.KeyedMultipleInputTransformation;
import org.apache.flink.util.Collector;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class StreamingJobGraphGeneratorWithOperatorAttributesTest {
    @Test
    void testOutputOnlyAfterEndOfStreamEnableChain() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)new Configuration());
        SingleOutputStreamOperator source = env.fromData((Object[])new Integer[]{1, 2, 3}).name("source");
        source.keyBy((KeySelector & Serializable)x -> x).transform("transform", Types.INT, new StreamOperatorWithConfigurableOperatorAttributes((MapFunction & Serializable)x -> x, new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build())).map((MapFunction & Serializable)x -> x).sinkTo((Sink)new DiscardingSink()).disableChaining().name("sink");
        StreamGraph streamGraph = env.getStreamGraph(false);
        HashMap<String, StreamNode> nodeMap = new HashMap<String, StreamNode>();
        for (StreamNode node : streamGraph.getStreamNodes()) {
            nodeMap.put(node.getOperatorName(), node);
        }
        Assertions.assertThat(nodeMap).hasSize(4);
        Assertions.assertThat((boolean)((StreamNode)nodeMap.get("Source: source")).isOutputOnlyAfterEndOfStream()).isFalse();
        Assertions.assertThat((boolean)((StreamNode)nodeMap.get("transform")).isOutputOnlyAfterEndOfStream()).isTrue();
        Assertions.assertThat((boolean)((StreamNode)nodeMap.get("Map")).isOutputOnlyAfterEndOfStream()).isFalse();
        Assertions.assertThat((boolean)((StreamNode)nodeMap.get("sink: Writer")).isOutputOnlyAfterEndOfStream()).isFalse();
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("Source: source"), 0);
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("transform"), 1);
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("Map"), 0);
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("sink: Writer"), 0);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        HashMap<String, JobVertex> vertexMap = new HashMap<String, JobVertex>();
        for (JobVertex vertex : jobGraph.getVertices()) {
            vertexMap.put(vertex.getName(), vertex);
        }
        Assertions.assertThat(vertexMap).hasSize(3);
        this.assertHasOutputPartitionType((JobVertex)vertexMap.get("Source: source"), ResultPartitionType.PIPELINED_BOUNDED);
        this.assertHasOutputPartitionType((JobVertex)vertexMap.get("transform -> Map"), ResultPartitionType.BLOCKING);
        Assertions.assertThat((boolean)((JobVertex)vertexMap.get("Source: source")).isAnyOutputBlocking()).isFalse();
        Assertions.assertThat((boolean)((JobVertex)vertexMap.get("transform -> Map")).isAnyOutputBlocking()).isTrue();
        Assertions.assertThat((boolean)((JobVertex)vertexMap.get("sink: Writer")).isAnyOutputBlocking()).isFalse();
    }

    @Test
    void testOutputOnlyAfterEndOfStreamDisableChain() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)new Configuration());
        SingleOutputStreamOperator source = env.fromData((Object[])new Integer[]{1, 2, 3}).name("source");
        source.keyBy((KeySelector & Serializable)x -> x).transform("transform", Types.INT, new StreamOperatorWithConfigurableOperatorAttributes((MapFunction & Serializable)x -> x, new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build())).map((MapFunction & Serializable)x -> x).sinkTo((Sink)new DiscardingSink()).disableChaining().name("sink");
        StreamGraph streamGraph = env.getStreamGraph(false);
        HashMap<String, StreamNode> nodeMap = new HashMap<String, StreamNode>();
        for (StreamNode node : streamGraph.getStreamNodes()) {
            nodeMap.put(node.getOperatorName(), node);
        }
        Assertions.assertThat(nodeMap).hasSize(4);
        Assertions.assertThat((boolean)((StreamNode)nodeMap.get("Source: source")).isOutputOnlyAfterEndOfStream()).isFalse();
        Assertions.assertThat((boolean)((StreamNode)nodeMap.get("transform")).isOutputOnlyAfterEndOfStream()).isTrue();
        Assertions.assertThat((boolean)((StreamNode)nodeMap.get("Map")).isOutputOnlyAfterEndOfStream()).isFalse();
        Assertions.assertThat((boolean)((StreamNode)nodeMap.get("sink: Writer")).isOutputOnlyAfterEndOfStream()).isFalse();
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("Source: source"), 0);
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("transform"), 1);
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("Map"), 0);
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("sink: Writer"), 0);
        env.disableOperatorChaining();
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)env.getStreamGraph(false));
        HashMap<String, JobVertex> vertexMap = new HashMap<String, JobVertex>();
        for (JobVertex vertex : jobGraph.getVertices()) {
            vertexMap.put(vertex.getName(), vertex);
        }
        Assertions.assertThat(vertexMap).hasSize(4);
        this.assertHasOutputPartitionType((JobVertex)vertexMap.get("Source: source"), ResultPartitionType.PIPELINED_BOUNDED);
        this.assertHasOutputPartitionType((JobVertex)vertexMap.get("transform"), ResultPartitionType.BLOCKING);
        this.assertHasOutputPartitionType((JobVertex)vertexMap.get("Map"), ResultPartitionType.PIPELINED_BOUNDED);
        Assertions.assertThat((boolean)((JobVertex)vertexMap.get("Source: source")).isAnyOutputBlocking()).isFalse();
        Assertions.assertThat((boolean)((JobVertex)vertexMap.get("transform")).isAnyOutputBlocking()).isTrue();
        Assertions.assertThat((boolean)((JobVertex)vertexMap.get("Map")).isAnyOutputBlocking()).isFalse();
        Assertions.assertThat((boolean)((JobVertex)vertexMap.get("sink: Writer")).isAnyOutputBlocking()).isFalse();
    }

    @Test
    void testOutputOnlyAfterEndOfStreamPropagateToUpstreamWithinChain() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)new Configuration());
        SingleOutputStreamOperator source = env.fromData((Object[])new Integer[]{1, 2, 3}).name("source");
        source.keyBy((KeySelector & Serializable)x -> x).map((MapFunction & Serializable)x -> x).transform("transform", Types.INT, new StreamOperatorWithConfigurableOperatorAttributes((MapFunction & Serializable)x -> x, new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build())).sinkTo((Sink)new DiscardingSink()).disableChaining().name("sink");
        StreamGraph streamGraph = env.getStreamGraph(false);
        HashMap<String, StreamNode> nodeMap = new HashMap<String, StreamNode>();
        for (StreamNode node : streamGraph.getStreamNodes()) {
            nodeMap.put(node.getOperatorName(), node);
        }
        Assertions.assertThat(nodeMap).hasSize(4);
        Assertions.assertThat((boolean)((StreamNode)nodeMap.get("Source: source")).isOutputOnlyAfterEndOfStream()).isFalse();
        Assertions.assertThat((boolean)((StreamNode)nodeMap.get("transform")).isOutputOnlyAfterEndOfStream()).isTrue();
        Assertions.assertThat((boolean)((StreamNode)nodeMap.get("Map")).isOutputOnlyAfterEndOfStream()).isFalse();
        Assertions.assertThat((boolean)((StreamNode)nodeMap.get("sink: Writer")).isOutputOnlyAfterEndOfStream()).isFalse();
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("Source: source"), 0);
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("Map"), 0);
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("transform"), 0);
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("sink: Writer"), 0);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        HashMap<String, JobVertex> vertexMap = new HashMap<String, JobVertex>();
        for (JobVertex vertex : jobGraph.getVertices()) {
            vertexMap.put(vertex.getName(), vertex);
        }
        Assertions.assertThat(vertexMap).hasSize(3);
        this.assertHasOutputPartitionType((JobVertex)vertexMap.get("Source: source"), ResultPartitionType.PIPELINED_BOUNDED);
        this.assertHasOutputPartitionType((JobVertex)vertexMap.get("Map -> transform"), ResultPartitionType.BLOCKING);
        Assertions.assertThat((boolean)((JobVertex)vertexMap.get("Source: source")).isAnyOutputBlocking()).isFalse();
        Assertions.assertThat((boolean)((JobVertex)vertexMap.get("Map -> transform")).isAnyOutputBlocking()).isTrue();
        Assertions.assertThat((boolean)((JobVertex)vertexMap.get("sink: Writer")).isAnyOutputBlocking()).isFalse();
    }

    @Test
    void testApplyBatchExecutionSettingsOnTwoInputOperator() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)new Configuration());
        SingleOutputStreamOperator source1 = env.fromData((Object[])new Integer[]{1, 2, 3}).name("source1");
        SingleOutputStreamOperator source2 = env.fromData((Object[])new Integer[]{1, 2, 3}).name("source2");
        source1.keyBy((KeySelector & Serializable)x -> x).connect((DataStream)source2.keyBy((KeySelector & Serializable)x -> x)).transform("transform", Types.INT, new TwoInputStreamOperatorWithConfigurableOperatorAttributes(new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build())).sinkTo((Sink)new DiscardingSink()).name("sink");
        StreamGraph streamGraph = env.getStreamGraph(false);
        HashMap<String, StreamNode> nodeMap = new HashMap<String, StreamNode>();
        for (StreamNode node : streamGraph.getStreamNodes()) {
            nodeMap.put(node.getOperatorName(), node);
        }
        Assertions.assertThat(nodeMap).hasSize(4);
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("Source: source1"), 0);
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("Source: source2"), 0);
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("transform"), 1);
        this.assertManagedMemoryWeightsSize((StreamNode)nodeMap.get("sink: Writer"), 0);
    }

    @Test
    void testOneInputOperatorWithInternalSorterSupported() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)new Configuration());
        SingleOutputStreamOperator source1 = env.fromData((Object[])new Integer[]{1, 2, 3}).name("source1");
        source1.keyBy((KeySelector & Serializable)x -> x).transform("internalSorter", Types.INT, new StreamOperatorWithConfigurableOperatorAttributes((MapFunction & Serializable)value -> value, new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).setInternalSorterSupported(true).build())).keyBy((KeySelector & Serializable)x -> x).transform("noInternalSorter", Types.INT, new StreamOperatorWithConfigurableOperatorAttributes((MapFunction & Serializable)value -> value, new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build())).sinkTo((Sink)new DiscardingSink()).name("sink");
        StreamGraph streamGraph = env.getStreamGraph(false);
        HashMap<String, StreamNode> nodeMap = new HashMap<String, StreamNode>();
        for (StreamNode node : streamGraph.getStreamNodes()) {
            nodeMap.put(node.getOperatorName(), node);
        }
        Assertions.assertThat((Map)((StreamNode)nodeMap.get("internalSorter")).getInputRequirements()).isEmpty();
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)((StreamNode)nodeMap.get("noInternalSorter")).getInputRequirements().get(0))).isEqualTo((Object)StreamConfig.InputRequirement.SORTED);
    }

    @Test
    void testTwoInputOperatorWithInternalSorterSupported() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)new Configuration());
        SingleOutputStreamOperator source1 = env.fromData((Object[])new Integer[]{1, 2, 3}).name("source1");
        SingleOutputStreamOperator source2 = env.fromData((Object[])new Integer[]{1, 2, 3}).name("source2");
        source1.keyBy((KeySelector & Serializable)x -> x).connect((DataStream)source2.keyBy((KeySelector & Serializable)x -> x)).transform("internalSorter", Types.INT, new TwoInputStreamOperatorWithConfigurableOperatorAttributes(new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).setInternalSorterSupported(true).build())).keyBy((KeySelector & Serializable)x -> x).connect((DataStream)source2.keyBy((KeySelector & Serializable)x -> x)).transform("noInternalSorter", Types.INT, new TwoInputStreamOperatorWithConfigurableOperatorAttributes(new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build())).sinkTo((Sink)new DiscardingSink()).name("sink");
        StreamGraph streamGraph = env.getStreamGraph(false);
        HashMap<String, StreamNode> nodeMap = new HashMap<String, StreamNode>();
        for (StreamNode node : streamGraph.getStreamNodes()) {
            nodeMap.put(node.getOperatorName(), node);
        }
        Assertions.assertThat((Map)((StreamNode)nodeMap.get("internalSorter")).getInputRequirements()).isEmpty();
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)((StreamNode)nodeMap.get("noInternalSorter")).getInputRequirements().get(0))).isEqualTo((Object)StreamConfig.InputRequirement.SORTED);
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)((StreamNode)nodeMap.get("noInternalSorter")).getInputRequirements().get(1))).isEqualTo((Object)StreamConfig.InputRequirement.SORTED);
    }

    @Test
    void testMultipleInputOperatorWithInternalSorterSupported() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)new Configuration());
        SingleOutputStreamOperator source1 = env.fromData((Object[])new Integer[]{1, 2, 3}).name("source1");
        SingleOutputStreamOperator source2 = env.fromData((Object[])new Integer[]{1, 2, 3}).name("source2");
        KeyedMultipleInputTransformation transform = new KeyedMultipleInputTransformation("internalSorter", new OperatorAttributesConfigurableOperatorFactory(new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).setInternalSorterSupported(true).build()), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 3, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        transform.addInput(source1.keyBy((KeySelector & Serializable)x -> x).getTransformation(), (KeySelector & Serializable)x -> x);
        transform.addInput(source2.getTransformation(), null);
        KeyedMultipleInputTransformation transform2 = new KeyedMultipleInputTransformation("noInternalSorter", new OperatorAttributesConfigurableOperatorFactory(new OperatorAttributesBuilder().setOutputOnlyAfterEndOfStream(true).build()), (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, 3, (TypeInformation)BasicTypeInfo.INT_TYPE_INFO);
        transform2.addInput((Transformation)transform, null);
        transform2.addInput(source2.keyBy((KeySelector & Serializable)x -> x).getTransformation(), (KeySelector & Serializable)x -> x);
        new DataStream(env, (Transformation)transform2).sinkTo((Sink)new DiscardingSink());
        StreamGraph streamGraph = env.getStreamGraph(false);
        HashMap<String, StreamNode> nodeMap = new HashMap<String, StreamNode>();
        for (StreamNode node : streamGraph.getStreamNodes()) {
            nodeMap.put(node.getOperatorName(), node);
        }
        Assertions.assertThat((Map)((StreamNode)nodeMap.get("internalSorter")).getInputRequirements()).isEmpty();
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)((StreamNode)nodeMap.get("noInternalSorter")).getInputRequirements().get(0))).isEqualTo((Object)StreamConfig.InputRequirement.PASS_THROUGH);
        Assertions.assertThat((Comparable)((StreamConfig.InputRequirement)((StreamNode)nodeMap.get("noInternalSorter")).getInputRequirements().get(1))).isEqualTo((Object)StreamConfig.InputRequirement.SORTED);
    }

    private void assertManagedMemoryWeightsSize(StreamNode node, int weightSize) {
        Assertions.assertThat((Map)node.getManagedMemoryOperatorScopeUseCaseWeights()).hasSize(weightSize);
    }

    private void assertHasOutputPartitionType(JobVertex jobVertex, ResultPartitionType partitionType) {
        Assertions.assertThat((Comparable)((IntermediateDataSet)jobVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)partitionType);
    }

    private static class NoOpCoProcessFunction<IN1, IN2, OUT>
    extends CoProcessFunction<IN1, IN2, OUT> {
        private NoOpCoProcessFunction() {
        }

        public void processElement1(IN1 value, CoProcessFunction.Context ctx, Collector<OUT> out) {
        }

        public void processElement2(IN2 value, CoProcessFunction.Context ctx, Collector<OUT> out) {
        }
    }

    private static class OperatorAttributesConfigurableOperatorFactory<OUT>
    implements StreamOperatorFactory<OUT> {
        private final OperatorAttributes operatorAttributes;

        public OperatorAttributesConfigurableOperatorFactory(OperatorAttributes operatorAttributes) {
            this.operatorAttributes = operatorAttributes;
        }

        public <T extends StreamOperator<OUT>> T createStreamOperator(StreamOperatorParameters<OUT> parameters) {
            throw new UnsupportedOperationException();
        }

        public void setChainingStrategy(ChainingStrategy strategy) {
        }

        public ChainingStrategy getChainingStrategy() {
            return ChainingStrategy.DEFAULT_CHAINING_STRATEGY;
        }

        public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader classLoader) {
            return StreamMap.class;
        }

        public OperatorAttributes getOperatorAttributes() {
            return this.operatorAttributes;
        }
    }

    private static class TwoInputStreamOperatorWithConfigurableOperatorAttributes<IN1, IN2, OUT>
    extends CoProcessOperator<IN1, IN2, OUT> {
        private final OperatorAttributes attributes;

        public TwoInputStreamOperatorWithConfigurableOperatorAttributes(OperatorAttributes attributes) {
            super(new NoOpCoProcessFunction());
            this.attributes = attributes;
        }

        public OperatorAttributes getOperatorAttributes() {
            return this.attributes;
        }
    }

    private static class StreamOperatorWithConfigurableOperatorAttributes<IN, OUT>
    extends StreamMap<IN, OUT> {
        private final OperatorAttributes attributes;

        public StreamOperatorWithConfigurableOperatorAttributes(MapFunction<IN, OUT> mapper, OperatorAttributes attributes) {
            super(mapper);
            this.attributes = attributes;
        }

        public OperatorAttributes getOperatorAttributes() {
            return this.attributes;
        }
    }
}

