/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.runtime;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.testutils.serialization.types.ByteArrayType;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class FileBufferReaderITCase
extends TestLogger {
    private static final int parallelism = 8;
    private static final int numRecords = 100000;
    private static final byte[] dataSource = new byte[1024];

    @BeforeClass
    public static void setup() {
        for (int i = 0; i < dataSource.length; ++i) {
            FileBufferReaderITCase.dataSource[i] = 0;
        }
    }

    @Test
    public void testSequentialReading() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(RestOptions.BIND_PORT, "0");
        configuration.setString(NettyShuffleEnvironmentOptions.NETWORK_BOUNDED_BLOCKING_SUBPARTITION_TYPE, "file");
        MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumTaskManagers(8).setNumSlotsPerTaskManager(1).build();
        try (MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);){
            miniCluster.start();
            MiniClusterClient client = new MiniClusterClient(configuration, miniCluster);
            JobGraph jobGraph = FileBufferReaderITCase.createJobGraph();
            CompletableFuture submitFuture = client.submitJob(jobGraph);
            JobSubmissionResult result = (JobSubmissionResult)submitFuture.get();
            CompletableFuture resultFuture = client.requestJobResult(result.getJobID());
            JobResult jobResult = (JobResult)resultFuture.get();
            Assert.assertThat((Object)jobResult.getSerializedThrowable().isPresent(), (Matcher)Matchers.is((Object)false));
        }
    }

    private static JobGraph createJobGraph() {
        SlotSharingGroup group1 = new SlotSharingGroup();
        SlotSharingGroup group2 = new SlotSharingGroup();
        JobVertex source = new JobVertex("source");
        source.setInvokableClass(TestSourceInvokable.class);
        source.setParallelism(8);
        source.setSlotSharingGroup(group1);
        JobVertex sink = new JobVertex("sink");
        sink.setInvokableClass(TestSinkInvokable.class);
        sink.setParallelism(8);
        sink.setSlotSharingGroup(group2);
        sink.connectNewDataSetAsInput(source, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{source, sink});
        jobGraph.setScheduleMode(ScheduleMode.LAZY_FROM_SOURCES);
        return jobGraph;
    }

    public static final class TestSinkInvokable
    extends AbstractInvokable {
        private int numReceived = 0;

        public TestSinkInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            RecordReader reader = new RecordReader(this.getEnvironment().getInputGate(0), ByteArrayType.class, this.getEnvironment().getTaskManagerInfo().getTmpDirectories());
            while (reader.hasNext()) {
                reader.next();
                ++this.numReceived;
            }
            Assert.assertThat((Object)this.numReceived, (Matcher)Matchers.is((Object)100000));
        }
    }

    public static final class TestSourceInvokable
    extends AbstractInvokable {
        public TestSourceInvokable(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            RecordWriter writer = new RecordWriterBuilder().build(this.getEnvironment().getWriter(0));
            ByteArrayType bytes = new ByteArrayType(dataSource);
            int counter = 0;
            while (counter++ < 100000) {
                try {
                    writer.emit((IOReadableWritable)bytes);
                    writer.flushAll();
                }
                finally {
                    writer.clearBuffers();
                }
            }
        }
    }
}

