/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.runtime;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.types.LongValue;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ShuffleCompressionITCase {
    private static final int NUM_BUFFERS_TO_SEND = 1000;
    private static final int BUFFER_SIZE = 32768;
    private static final int BYTES_PER_RECORD = 12;
    private static final int NUM_RECORDS_TO_SEND = 2730667;
    private static final int NUM_TASKMANAGERS = 2;
    private static final int NUM_SLOTS = 4;
    private static final int PARALLELISM = 8;
    private static final LongValue RECORD_TO_SEND = new LongValue(4387942071694473832L);
    @Parameterized.Parameter
    public static boolean useBroadcastPartitioner = false;

    @Parameterized.Parameters(name="useBroadcastPartitioner = {0}")
    public static Boolean[] params() {
        return new Boolean[]{true, false};
    }

    @Test
    public void testDataCompressionForBlockingShuffle() throws Exception {
        this.executeTest(ShuffleCompressionITCase.createJobGraph(ScheduleMode.LAZY_FROM_SOURCES, ResultPartitionType.BLOCKING, ExecutionMode.BATCH));
    }

    private void executeTest(JobGraph jobGraph) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, (Object)MemorySize.parse((String)"1g"));
        configuration.setBoolean(NettyShuffleEnvironmentOptions.BLOCKING_SHUFFLE_COMPRESSION_ENABLED, true);
        MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumTaskManagers(2).setNumSlotsPerTaskManager(4).build();
        try (MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);){
            miniCluster.start();
            MiniClusterClient miniClusterClient = new MiniClusterClient(configuration, miniCluster);
            JobID jobID = (JobID)miniClusterClient.submitJob(jobGraph).get();
            CompletableFuture resultFuture = miniClusterClient.requestJobResult(jobID);
            Assert.assertFalse((boolean)((JobResult)resultFuture.get()).getSerializedThrowable().isPresent());
        }
    }

    private static JobGraph createJobGraph(ScheduleMode scheduleMode, ResultPartitionType resultPartitionType, ExecutionMode executionMode) throws IOException {
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(LongValueSource.class);
        source.setParallelism(8);
        source.setSlotSharingGroup(slotSharingGroup);
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(ResultVerifyingSink.class);
        sink.setParallelism(8);
        sink.setSlotSharingGroup(slotSharingGroup);
        sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, resultPartitionType);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{source, sink});
        jobGraph.setScheduleMode(scheduleMode);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setExecutionMode(executionMode);
        jobGraph.setExecutionConfig(executionConfig);
        return jobGraph;
    }

    public static final class ResultVerifyingSink
    extends AbstractInvokable {
        public ResultVerifyingSink(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            MutableRecordReader reader = new MutableRecordReader((InputGate)this.getEnvironment().getInputGate(0), new String[]{EnvironmentInformation.getTemporaryFileDirectory()});
            LongValue value = new LongValue();
            for (int i = 0; i < 21845336; ++i) {
                reader.next((IOReadableWritable)value);
                Assert.assertEquals((long)RECORD_TO_SEND.getValue(), (long)value.getValue());
            }
        }
    }

    public static final class LongValueSource
    extends AbstractInvokable {
        public LongValueSource(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            ResultPartitionWriter resultPartitionWriter = this.getEnvironment().getWriter(0);
            RecordWriterBuilder recordWriterBuilder = new RecordWriterBuilder();
            if (this.getEnvironment().getExecutionConfig().getExecutionMode() == ExecutionMode.PIPELINED) {
                recordWriterBuilder.setTimeout(100L);
            }
            if (useBroadcastPartitioner) {
                recordWriterBuilder.setChannelSelector((ChannelSelector)new BroadcastPartitioner());
            }
            RecordWriter writer = recordWriterBuilder.build(resultPartitionWriter);
            for (int i = 0; i < 2730667; ++i) {
                writer.broadcastEmit((IOReadableWritable)RECORD_TO_SEND);
            }
            writer.flushAll();
            writer.clearBuffers();
        }
    }
}

