/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.Serializable;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.GlobalStreamExchangeMode;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamExchangeMode;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RescalePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class StreamingJobGraphGeneratorWithGlobalStreamExchangeModeTest {
    StreamingJobGraphGeneratorWithGlobalStreamExchangeModeTest() {
    }

    @Test
    void testDefaultGlobalExchangeModeIsAllEdgesPipelined() {
        StreamGraph streamGraph = StreamingJobGraphGeneratorWithGlobalStreamExchangeModeTest.createStreamGraph();
        Assertions.assertThat((Comparable)streamGraph.getGlobalStreamExchangeMode()).isEqualTo((Object)GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);
    }

    @Test
    void testAllEdgesBlockingMode() {
        StreamGraph streamGraph = StreamingJobGraphGeneratorWithGlobalStreamExchangeModeTest.createStreamGraph(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex map1Vertex = (JobVertex)verticesSorted.get(1);
        JobVertex map2Vertex = (JobVertex)verticesSorted.get(2);
        Assertions.assertThat((Comparable)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.BLOCKING);
        Assertions.assertThat((Comparable)((IntermediateDataSet)map1Vertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.BLOCKING);
        Assertions.assertThat((Comparable)((IntermediateDataSet)map2Vertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.BLOCKING);
    }

    @Test
    void testAllEdgesPipelinedMode() {
        StreamGraph streamGraph = StreamingJobGraphGeneratorWithGlobalStreamExchangeModeTest.createStreamGraph();
        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex map1Vertex = (JobVertex)verticesSorted.get(1);
        JobVertex map2Vertex = (JobVertex)verticesSorted.get(2);
        Assertions.assertThat((Comparable)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.PIPELINED_BOUNDED);
        Assertions.assertThat((Comparable)((IntermediateDataSet)map1Vertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.PIPELINED_BOUNDED);
        Assertions.assertThat((Comparable)((IntermediateDataSet)map2Vertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.PIPELINED_BOUNDED);
    }

    @Test
    void testForwardEdgesPipelinedMode() {
        StreamGraph streamGraph = StreamingJobGraphGeneratorWithGlobalStreamExchangeModeTest.createStreamGraph(GlobalStreamExchangeMode.FORWARD_EDGES_PIPELINED);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex map1Vertex = (JobVertex)verticesSorted.get(1);
        JobVertex map2Vertex = (JobVertex)verticesSorted.get(2);
        Assertions.assertThat((Comparable)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.PIPELINED_BOUNDED);
        Assertions.assertThat((Comparable)((IntermediateDataSet)map1Vertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.BLOCKING);
        Assertions.assertThat((Comparable)((IntermediateDataSet)map2Vertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.BLOCKING);
    }

    @Test
    void testPointwiseEdgesPipelinedMode() {
        StreamGraph streamGraph = StreamingJobGraphGeneratorWithGlobalStreamExchangeModeTest.createStreamGraph(GlobalStreamExchangeMode.POINTWISE_EDGES_PIPELINED);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        JobVertex map1Vertex = (JobVertex)verticesSorted.get(1);
        JobVertex map2Vertex = (JobVertex)verticesSorted.get(2);
        Assertions.assertThat((Comparable)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.PIPELINED_BOUNDED);
        Assertions.assertThat((Comparable)((IntermediateDataSet)map1Vertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.PIPELINED_BOUNDED);
        Assertions.assertThat((Comparable)((IntermediateDataSet)map2Vertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.BLOCKING);
    }

    @Test
    void testGlobalExchangeModeDoesNotOverrideSpecifiedExchangeMode() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource source = env.fromData((Object[])new Integer[]{1, 2, 3}).setParallelism(1);
        DataStream forward = new DataStream(env, (Transformation)new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), StreamExchangeMode.PIPELINED));
        forward.map((MapFunction & Serializable)i -> i).startNewChain().setParallelism(1);
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setGlobalStreamExchangeMode(GlobalStreamExchangeMode.ALL_EDGES_BLOCKING);
        JobGraph jobGraph = StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
        List verticesSorted = jobGraph.getVerticesSortedTopologicallyFromSources();
        JobVertex sourceVertex = (JobVertex)verticesSorted.get(0);
        Assertions.assertThat((Comparable)((IntermediateDataSet)sourceVertex.getProducedDataSets().get(0)).getResultType()).isEqualTo((Object)ResultPartitionType.PIPELINED_BOUNDED);
    }

    private static StreamGraph createStreamGraph() {
        return StreamingJobGraphGeneratorWithGlobalStreamExchangeModeTest.createStreamGraph(GlobalStreamExchangeMode.ALL_EDGES_PIPELINED);
    }

    private static StreamGraph createStreamGraph(GlobalStreamExchangeMode globalStreamExchangeMode) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        if (globalStreamExchangeMode != GlobalStreamExchangeMode.ALL_EDGES_PIPELINED) {
            env.setBufferTimeout(-1L);
        }
        DataStreamSource source = env.fromData((Object[])new Integer[]{1, 2, 3}).setParallelism(1);
        DataStream forward = new DataStream(env, (Transformation)new PartitionTransformation(source.getTransformation(), (StreamPartitioner)new ForwardPartitioner(), StreamExchangeMode.UNDEFINED));
        SingleOutputStreamOperator map1 = forward.map((MapFunction & Serializable)i -> i).startNewChain().setParallelism(1);
        DataStream rescale = new DataStream(env, (Transformation)new PartitionTransformation(map1.getTransformation(), (StreamPartitioner)new RescalePartitioner(), StreamExchangeMode.UNDEFINED));
        SingleOutputStreamOperator map2 = rescale.map((MapFunction & Serializable)i -> i).setParallelism(2);
        map2.rebalance().print().setParallelism(2);
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setGlobalStreamExchangeMode(globalStreamExchangeMode);
        return streamGraph;
    }
}

