/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.runtime;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.UUID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.test.runtime.BlockingShuffleITCase;
import org.apache.flink.test.runtime.JobGraphRunningUtil;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;

class BatchShuffleITCaseBase {
    private static final String RECORD = "batch shuffle test";
    private static final int NUM_TASK_MANAGERS = 2;
    private static final int NUM_SLOTS_PER_TASK_MANAGER = 10;
    private static final int PARALLELISM = 10;
    private static final int[] NUM_RECEIVED_RECORDS = new int[10];
    private static Path tmpDir;

    BatchShuffleITCaseBase() {
    }

    @BeforeAll
    static void setupClass(@TempDir Path path) throws Exception {
        tmpDir = TempDirUtils.newFolder((Path)path, (String[])new String[]{UUID.randomUUID().toString()}).toPath();
    }

    @BeforeEach
    public void setup() {
        Arrays.fill(NUM_RECEIVED_RECORDS, 0);
    }

    protected JobGraph createJobGraph(int numRecordsToSend, boolean failExecution, Configuration configuration, boolean enableAdaptiveAutoParallelism) {
        return this.createJobGraph(numRecordsToSend, failExecution, false, configuration, enableAdaptiveAutoParallelism);
    }

    protected JobGraph createJobGraph(int numRecordsToSend, boolean failExecution, boolean deletePartitionFile, Configuration configuration, boolean enableAdaptiveAutoParallelism) {
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_ENABLED, (Object)enableAdaptiveAutoParallelism);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)configuration);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)2, (long)0L));
        env.setParallelism(10);
        DataStreamSource source = new DataStreamSource(env, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, new StreamSource((SourceFunction)new StringSource(numRecordsToSend)), true, "source", Boundedness.BOUNDED).setParallelism(10);
        source.rebalance().map((MapFunction & Serializable)value -> value).shuffle().addSink((SinkFunction)new VerifySink(failExecution, deletePartitionFile));
        StreamGraph streamGraph = env.getStreamGraph();
        streamGraph.setJobType(JobType.BATCH);
        return StreamingJobGraphGenerator.createJobGraph((StreamGraph)streamGraph);
    }

    protected Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        configuration.set(CoreOptions.TMP_DIRS, (Object)tmpDir.toString());
        configuration.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.BATCH);
        configuration.set(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, (Object)100);
        return configuration;
    }

    protected void executeJob(JobGraph jobGraph, Configuration configuration, int numRecordsToSend) throws Exception {
        JobGraphRunningUtil.execute(jobGraph, configuration, 2, 10);
        this.checkAllDataReceived(numRecordsToSend);
    }

    private void checkAllDataReceived(int numRecordsToSend) {
        Assertions.assertThat((int)Arrays.stream(NUM_RECEIVED_RECORDS).sum()).isEqualTo(numRecordsToSend * 10);
    }

    private static class VerifySink
    extends RichSinkFunction<String> {
        private final boolean failExecution;
        private final boolean deletePartitionFile;
        private boolean isWatermarkReceived;

        VerifySink(boolean failExecution, boolean deletePartitionFile) {
            this.failExecution = failExecution;
            this.deletePartitionFile = deletePartitionFile;
            this.isWatermarkReceived = false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         * Converted monitor instructions to comments
         * Lifted jumps to return sites
         */
        public void open(OpenContext openContext) throws Exception {
            NUM_RECEIVED_RECORDS[this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()] = 0;
            if (this.getRuntimeContext().getTaskInfo().getAttemptNumber() > 0) return;
            if (this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() != 0) {
                return;
            }
            if (this.deletePartitionFile) {
                Class<BlockingShuffleITCase> clazz = BlockingShuffleITCase.class;
                // MONITORENTER : org.apache.flink.test.runtime.BlockingShuffleITCase.class
                VerifySink.deleteFiles(tmpDir.toFile());
                // MONITOREXIT : clazz
            }
            if (!this.failExecution) return;
            throw new RuntimeException("expected exception.");
        }

        public void writeWatermark(Watermark watermark) {
            Assertions.assertThat((long)watermark.getTimestamp()).isEqualTo(Long.MAX_VALUE);
            this.isWatermarkReceived = true;
        }

        public void invoke(String value, SinkFunction.Context context) throws Exception {
            Assertions.assertThat((boolean)this.isWatermarkReceived).isFalse();
            int[] nArray = NUM_RECEIVED_RECORDS;
            int n = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            nArray[n] = nArray[n] + 1;
            Assertions.assertThat((String)value).isEqualTo(BatchShuffleITCaseBase.RECORD);
        }

        public void finish() {
            Assertions.assertThat((boolean)this.isWatermarkReceived).isTrue();
        }

        private static void deleteFiles(File root) throws IOException {
            File[] files = root.listFiles();
            if (files == null || files.length == 0) {
                return;
            }
            for (File file : files) {
                if (!file.isDirectory()) {
                    Files.deleteIfExists(file.toPath());
                    continue;
                }
                VerifySink.deleteFiles(file);
            }
        }
    }

    private static class StringSource
    implements ParallelSourceFunction<String> {
        private volatile boolean isRunning = true;
        private int numRecordsToSend;

        StringSource(int numRecordsToSend) {
            this.numRecordsToSend = numRecordsToSend;
        }

        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            while (this.isRunning && this.numRecordsToSend-- > 0) {
                ctx.collect((Object)BatchShuffleITCaseBase.RECORD);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

