/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.runtime;

import java.io.Serializable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.test.runtime.JobGraphRunningUtil;
import org.junit.Assert;
import org.junit.Test;

public class BlockingShuffleITCase {
    private static final String RECORD = "hello, world!";
    private final int numTaskManagers = 2;
    private final int numSlotsPerTaskManager = 4;

    @Test
    public void testBoundedBlockingShuffle() throws Exception {
        JobGraph jobGraph = this.createJobGraph(1000000);
        Configuration configuration = new Configuration();
        JobGraphRunningUtil.execute(jobGraph, configuration, 2, 4);
    }

    @Test
    public void testBoundedBlockingShuffleWithoutData() throws Exception {
        JobGraph jobGraph = this.createJobGraph(0);
        Configuration configuration = new Configuration();
        JobGraphRunningUtil.execute(jobGraph, configuration, 2, 4);
    }

    @Test
    public void testSortMergeBlockingShuffle() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
        JobGraph jobGraph = this.createJobGraph(1000000);
        JobGraphRunningUtil.execute(jobGraph, configuration, 2, 4);
    }

    @Test
    public void testSortMergeBlockingShuffleWithoutData() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_SORT_SHUFFLE_MIN_PARALLELISM, 1);
        JobGraph jobGraph = this.createJobGraph(0);
        JobGraphRunningUtil.execute(jobGraph, configuration, 2, 4);
    }

    private JobGraph createJobGraph(int numRecordsToSend) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(8);
        DataStreamSource source = env.addSource((SourceFunction)new StringSource(numRecordsToSend));
        source.rebalance().map((MapFunction & Serializable)value -> value).broadcast().addSink((SinkFunction)new VerifySink());
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
        streamGraph.setJobType(JobType.BATCH);
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
    }

    private static class VerifySink
    implements SinkFunction<String> {
        private VerifySink() {
        }

        public void invoke(String value) throws Exception {
            Assert.assertEquals((Object)BlockingShuffleITCase.RECORD, (Object)value);
        }
    }

    private static class StringSource
    implements ParallelSourceFunction<String> {
        private volatile boolean isRunning = true;
        private int numRecordsToSend;

        StringSource(int numRecordsToSend) {
            this.numRecordsToSend = numRecordsToSend;
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            while (this.isRunning && this.numRecordsToSend-- > 0) {
                ctx.collect((Object)BlockingShuffleITCase.RECORD);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

