/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.HashMap;
import java.util.HashSet;
import java.util.UUID;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.DataSinkTask;
import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataSinkTaskTest
extends TaskTestBase {
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class);
    private static final int MEMORY_MANAGER_SIZE = 0x300000;
    private static final int NETWORK_BUFFER_SIZE = 1024;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDataSinkTask() {
        InputStreamReader fr = null;
        BufferedReader br = null;
        try {
            int keyCnt = 100;
            int valCnt = 20;
            super.initEnvironment(0x300000L, 1024);
            super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
            DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
            File tempTestFile = new File(this.tempFolder.getRoot(), UUID.randomUUID().toString());
            super.registerFileOutputTask(MockOutputFormat.class, tempTestFile.toURI().toString(), new Configuration());
            testTask.invoke();
            Assert.assertTrue((String)"Temp output file does not exist", (boolean)tempTestFile.exists());
            fr = new FileReader(tempTestFile);
            br = new BufferedReader(fr);
            HashMap keyValueCountMap = new HashMap(keyCnt);
            while (br.ready()) {
                String line = br.readLine();
                Integer key = Integer.parseInt(line.substring(0, line.indexOf("_")));
                Integer val = Integer.parseInt(line.substring(line.indexOf("_") + 1, line.length()));
                if (!keyValueCountMap.containsKey(key)) {
                    keyValueCountMap.put(key, new HashSet());
                }
                ((HashSet)keyValueCountMap.get(key)).add(val);
            }
            Assert.assertTrue((String)("Invalid key count in out file. Expected: " + keyCnt + " Actual: " + keyValueCountMap.keySet().size()), (keyValueCountMap.keySet().size() == keyCnt ? 1 : 0) != 0);
            for (Integer key : keyValueCountMap.keySet()) {
                Assert.assertTrue((String)("Invalid value count for key: " + key + ". Expected: " + valCnt + " Actual: " + ((HashSet)keyValueCountMap.get(key)).size()), (((HashSet)keyValueCountMap.get(key)).size() == valCnt ? 1 : 0) != 0);
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            if (br != null) {
                try {
                    br.close();
                }
                catch (Throwable throwable) {}
            }
            if (fr != null) {
                try {
                    fr.close();
                }
                catch (Throwable throwable) {}
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testUnionDataSinkTask() {
        int keyCnt = 10;
        int valCnt = 20;
        super.initEnvironment(0x300000L, 1024);
        IteratorWrappingTestSingleInputGate[] readers = new IteratorWrappingTestSingleInputGate[]{super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 0, 0, false), 0, false), super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt, 0, false), 0, false), super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 2, 0, false), 0, false), super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 3, 0, false), 0, false)};
        DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
        File tempTestFile = new File(this.tempFolder.getRoot(), UUID.randomUUID().toString());
        super.registerFileOutputTask(MockOutputFormat.class, tempTestFile.toURI().toString(), new Configuration());
        try {
            for (IteratorWrappingTestSingleInputGate reader : readers) {
                reader.notifyNonEmpty();
            }
            testTask.invoke();
        }
        catch (Exception e) {
            LOG.debug("Exception while invoking the test task.", (Throwable)e);
            Assert.fail((String)"Invoke method caused exception.");
        }
        Assert.assertTrue((String)"Temp output file does not exist", (boolean)tempTestFile.exists());
        FileReader fr = null;
        BufferedReader br = null;
        try {
            fr = new FileReader(tempTestFile);
            br = new BufferedReader(fr);
            HashMap keyValueCountMap = new HashMap(keyCnt);
            while (br.ready()) {
                String line = br.readLine();
                Integer key = Integer.parseInt(line.substring(0, line.indexOf("_")));
                Integer val = Integer.parseInt(line.substring(line.indexOf("_") + 1, line.length()));
                if (!keyValueCountMap.containsKey(key)) {
                    keyValueCountMap.put(key, new HashSet());
                }
                ((HashSet)keyValueCountMap.get(key)).add(val);
            }
            Assert.assertTrue((String)("Invalid key count in out file. Expected: " + keyCnt + " Actual: " + keyValueCountMap.keySet().size()), (keyValueCountMap.keySet().size() == keyCnt * 4 ? 1 : 0) != 0);
            for (Integer key : keyValueCountMap.keySet()) {
                Assert.assertTrue((String)("Invalid value count for key: " + key + ". Expected: " + valCnt + " Actual: " + ((HashSet)keyValueCountMap.get(key)).size()), (((HashSet)keyValueCountMap.get(key)).size() == valCnt ? 1 : 0) != 0);
            }
        }
        catch (FileNotFoundException e) {
            Assert.fail((String)"Out file got lost...");
        }
        catch (IOException ioe) {
            Assert.fail((String)"Caught IOE while reading out file");
        }
        finally {
            if (br != null) {
                try {
                    br.close();
                }
                catch (Throwable e) {}
            }
            if (fr != null) {
                try {
                    fr.close();
                }
                catch (Throwable e) {}
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSortingDataSinkTask() {
        int keyCnt = 100;
        int valCnt = 20;
        double memoryFraction = 1.0;
        super.initEnvironment(0x300000L, 1024);
        super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
        DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
        super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
        super.getTaskConfig().setInputComparator((TypeComparatorFactory)new RecordComparatorFactory(new int[]{1}, new Class[]{IntValue.class}), 0);
        super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
        super.getTaskConfig().setFilehandlesInput(0, 8);
        super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
        File tempTestFile = new File(this.tempFolder.getRoot(), UUID.randomUUID().toString());
        super.registerFileOutputTask(MockOutputFormat.class, tempTestFile.toURI().toString(), new Configuration());
        try {
            testTask.invoke();
        }
        catch (Exception e) {
            LOG.debug("Exception while invoking the test task.", (Throwable)e);
            Assert.fail((String)"Invoke method caused exception.");
        }
        Assert.assertTrue((String)"Temp output file does not exist", (boolean)tempTestFile.exists());
        FileReader fr = null;
        BufferedReader br = null;
        try {
            fr = new FileReader(tempTestFile);
            br = new BufferedReader(fr);
            HashSet<Integer> keys = new HashSet<Integer>();
            int curVal = -1;
            while (br.ready()) {
                String line = br.readLine();
                Integer key = Integer.parseInt(line.substring(0, line.indexOf("_")));
                Integer val = Integer.parseInt(line.substring(line.indexOf("_") + 1, line.length()));
                Assert.assertTrue((String)"Values not in ascending order", (val >= curVal ? 1 : 0) != 0);
                if (val > curVal) {
                    if (curVal != -1) {
                        Assert.assertTrue((String)"Keys missing for value", (keys.size() == 100 ? 1 : 0) != 0);
                    }
                    keys.clear();
                    curVal = val;
                }
                Assert.assertTrue((String)"Duplicate key for value", (boolean)keys.add(key));
            }
        }
        catch (FileNotFoundException e) {
            Assert.fail((String)"Out file got lost...");
        }
        catch (IOException ioe) {
            Assert.fail((String)"Caught IOE while reading out file");
        }
        finally {
            if (br != null) {
                try {
                    br.close();
                }
                catch (Throwable e) {}
            }
            if (fr != null) {
                try {
                    fr.close();
                }
                catch (Throwable e) {}
            }
        }
    }

    @Test
    public void testFailingDataSinkTask() {
        int keyCnt = 100;
        int valCnt = 20;
        super.initEnvironment(0x300000L, 1024);
        super.addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
        DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
        Configuration stubParams = new Configuration();
        File tempTestFile = new File(this.tempFolder.getRoot(), UUID.randomUUID().toString());
        super.registerFileOutputTask(MockFailingOutputFormat.class, tempTestFile.toURI().toString(), stubParams);
        boolean stubFailed = false;
        try {
            testTask.invoke();
        }
        catch (Exception e) {
            stubFailed = true;
        }
        Assert.assertTrue((String)"Function exception was not forwarded.", (boolean)stubFailed);
        Assert.assertFalse((String)"Temp output file has not been removed", (boolean)tempTestFile.exists());
    }

    @Test
    public void testFailingSortingDataSinkTask() {
        int keyCnt = 100;
        int valCnt = 20;
        double memoryFraction = 1.0;
        super.initEnvironment(0x300000L, 1024);
        super.addInput(new UniformRecordGenerator(keyCnt, valCnt, true), 0);
        DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
        Configuration stubParams = new Configuration();
        super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
        super.getTaskConfig().setInputComparator((TypeComparatorFactory)new RecordComparatorFactory(new int[]{1}, new Class[]{IntValue.class}), 0);
        super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
        super.getTaskConfig().setFilehandlesInput(0, 8);
        super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
        File tempTestFile = new File(this.tempFolder.getRoot(), UUID.randomUUID().toString());
        super.registerFileOutputTask(MockFailingOutputFormat.class, tempTestFile.toURI().toString(), stubParams);
        boolean stubFailed = false;
        try {
            testTask.invoke();
        }
        catch (Exception e) {
            stubFailed = true;
        }
        Assert.assertTrue((String)"Function exception was not forwarded.", (boolean)stubFailed);
        Assert.assertFalse((String)"Temp output file has not been removed", (boolean)tempTestFile.exists());
    }

    @Test
    public void testCancelDataSinkTask() throws Exception {
        super.initEnvironment(0x300000L, 1024);
        super.addInput(new InfiniteInputIterator(), 0);
        final DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
        Configuration stubParams = new Configuration();
        File tempTestFile = new File(this.tempFolder.getRoot(), UUID.randomUUID().toString());
        super.registerFileOutputTask(MockOutputFormat.class, tempTestFile.toURI().toString(), stubParams);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    testTask.invoke();
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                    Assert.fail((String)"Task threw exception although it was properly canceled");
                }
            }
        };
        taskRunner.start();
        long deadline = System.currentTimeMillis() + 60000L;
        while (!tempTestFile.exists() && System.currentTimeMillis() < deadline) {
            Thread.sleep(10L);
        }
        Assert.assertTrue((String)"Task did not create file within 60 seconds", (boolean)tempTestFile.exists());
        Thread.sleep(500L);
        testTask.cancel();
        taskRunner.interrupt();
        taskRunner.join();
        Assert.assertFalse((String)"Temp output file has not been removed", (boolean)tempTestFile.exists());
    }

    @Test
    public void testCancelSortingDataSinkTask() {
        double memoryFraction = 1.0;
        super.initEnvironment(0x300000L, 1024);
        super.addInput(new InfiniteInputIterator(), 0);
        final DataSinkTask testTask = new DataSinkTask((Environment)this.mockEnv);
        Configuration stubParams = new Configuration();
        super.getTaskConfig().setInputLocalStrategy(0, LocalStrategy.SORT);
        super.getTaskConfig().setInputComparator((TypeComparatorFactory)new RecordComparatorFactory(new int[]{1}, new Class[]{IntValue.class}), 0);
        super.getTaskConfig().setRelativeMemoryInput(0, memoryFraction);
        super.getTaskConfig().setFilehandlesInput(0, 8);
        super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
        File tempTestFile = new File(this.tempFolder.getRoot(), UUID.randomUUID().toString());
        super.registerFileOutputTask(MockOutputFormat.class, tempTestFile.toURI().toString(), stubParams);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    testTask.invoke();
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                    Assert.fail((String)"Task threw exception although it was properly canceled");
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(2, taskRunner, (AbstractInvokable)testTask);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assert.fail((String)"Joining threads failed");
        }
    }

    public static class MockFailingOutputFormat
    extends MockOutputFormat {
        private static final long serialVersionUID = 1L;
        int cnt = 0;

        @Override
        public void configure(Configuration parameters) {
            super.configure(parameters);
        }

        @Override
        public void writeRecord(Record rec) throws IOException {
            if (++this.cnt >= 10) {
                throw new RuntimeException("Expected Test Exception");
            }
            super.writeRecord(rec);
        }
    }

    public static class MockOutputFormat
    extends FileOutputFormat<Record> {
        private static final long serialVersionUID = 1L;
        final StringBuilder bld = new StringBuilder();

        public void configure(Configuration parameters) {
            super.configure(parameters);
        }

        public void writeRecord(Record rec) throws IOException {
            IntValue key = (IntValue)rec.getField(0, IntValue.class);
            IntValue value = (IntValue)rec.getField(1, IntValue.class);
            this.bld.setLength(0);
            this.bld.append(key.getValue());
            this.bld.append('_');
            this.bld.append(value.getValue());
            this.bld.append('\n');
            byte[] bytes = this.bld.toString().getBytes(ConfigConstants.DEFAULT_CHARSET);
            this.stream.write(bytes);
        }
    }
}

