/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
import org.apache.flink.runtime.operators.CoGroupDriver;
import org.apache.flink.runtime.operators.CoGroupTaskExternalITCase;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.testutils.DelayingInfinitiveInputIterator;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class CoGroupTaskTest
extends DriverTestBase<CoGroupFunction<Record, Record, Record>> {
    private static final long SORT_MEM = 0x300000L;
    private final RecordComparator comparator1 = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
    private final RecordComparator comparator2 = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
    private final DriverTestBase.CountingOutputCollector output = new DriverTestBase.CountingOutputCollector();

    public CoGroupTaskTest(ExecutionConfig config) {
        super(config, 0L, 2, 0x300000L);
    }

    @Test
    public void testSortBoth1CoGroupTask() {
        int keyCnt1 = 100;
        int valCnt1 = 2;
        int keyCnt2 = 200;
        int valCnt2 = 1;
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2) + (keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
        this.setOutput(this.output);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        CoGroupDriver testTask = new CoGroupDriver();
        try {
            this.addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
            this.addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
            this.testDriver((Driver)testTask, CoGroupTaskExternalITCase.MockCoGroupStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"The test caused an exception.");
        }
        Assert.assertEquals((String)"Wrong result set size.", (long)expCnt, (long)this.output.getNumberOfRecords());
    }

    @Test
    public void testSortBoth2CoGroupTask() {
        int keyCnt1 = 200;
        int valCnt1 = 2;
        int keyCnt2 = 200;
        int valCnt2 = 4;
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2) + (keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
        this.setOutput(this.output);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        CoGroupDriver testTask = new CoGroupDriver();
        try {
            this.addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
            this.addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
            this.testDriver((Driver)testTask, CoGroupTaskExternalITCase.MockCoGroupStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"The test caused an exception.");
        }
        Assert.assertEquals((String)"Wrong result set size.", (long)expCnt, (long)this.output.getNumberOfRecords());
    }

    @Test
    public void testSortFirstCoGroupTask() {
        int keyCnt1 = 200;
        int valCnt1 = 2;
        int keyCnt2 = 200;
        int valCnt2 = 4;
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2) + (keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
        this.setOutput(this.output);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        CoGroupDriver testTask = new CoGroupDriver();
        try {
            this.addInputSorted(new UniformRecordGenerator(keyCnt1, valCnt1, false), this.comparator1.duplicate());
            this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
            this.testDriver((Driver)testTask, CoGroupTaskExternalITCase.MockCoGroupStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"The test caused an exception.");
        }
        Assert.assertEquals((String)"Wrong result set size.", (long)expCnt, (long)this.output.getNumberOfRecords());
    }

    @Test
    public void testSortSecondCoGroupTask() {
        int keyCnt1 = 200;
        int valCnt1 = 2;
        int keyCnt2 = 200;
        int valCnt2 = 4;
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2) + (keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
        this.setOutput(this.output);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        CoGroupDriver testTask = new CoGroupDriver();
        try {
            this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
            this.addInputSorted(new UniformRecordGenerator(keyCnt2, valCnt2, false), this.comparator2.duplicate());
            this.testDriver((Driver)testTask, CoGroupTaskExternalITCase.MockCoGroupStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"The test caused an exception.");
        }
        Assert.assertEquals((String)"Wrong result set size.", (long)expCnt, (long)this.output.getNumberOfRecords());
    }

    @Test
    public void testMergeCoGroupTask() {
        int keyCnt1 = 200;
        int valCnt1 = 2;
        int keyCnt2 = 200;
        int valCnt2 = 4;
        int expCnt = valCnt1 * valCnt2 * Math.min(keyCnt1, keyCnt2) + (keyCnt1 > keyCnt2 ? (keyCnt1 - keyCnt2) * valCnt1 : (keyCnt2 - keyCnt1) * valCnt2);
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        CoGroupDriver testTask = new CoGroupDriver();
        try {
            this.testDriver((Driver)testTask, CoGroupTaskExternalITCase.MockCoGroupStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"The test caused an exception.");
        }
        Assert.assertEquals((String)"Wrong result set size.", (long)expCnt, (long)this.output.getNumberOfRecords());
    }

    @Test
    public void testFailingSortCoGroupTask() {
        int keyCnt1 = 100;
        int valCnt1 = 2;
        int keyCnt2 = 200;
        int valCnt2 = 1;
        this.setOutput(this.output);
        this.addInput(new UniformRecordGenerator(keyCnt1, valCnt1, true));
        this.addInput(new UniformRecordGenerator(keyCnt2, valCnt2, true));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        CoGroupDriver testTask = new CoGroupDriver();
        try {
            this.testDriver((Driver)testTask, MockFailingCoGroupStub.class);
            Assert.fail((String)"Function exception was not forwarded.");
        }
        catch (ExpectedTestException expectedTestException) {
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"The test caused an exception.");
        }
    }

    @Test
    public void testCancelCoGroupTaskWhileSorting1() {
        int keyCnt = 10;
        int valCnt = 2;
        this.setOutput(this.output);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        final CoGroupDriver testTask = new CoGroupDriver();
        try {
            this.addInputSorted(new DelayingInfinitiveInputIterator(1000), this.comparator1.duplicate());
            this.addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"The test caused an exception.");
        }
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    CoGroupTaskTest.this.testDriver((Driver)testTask, CoGroupTaskExternalITCase.MockCoGroupStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assert.fail((String)"Joining threads failed");
        }
        Assert.assertTrue((String)"Test threw an exception even though it was properly canceled.", (boolean)success.get());
    }

    @Test
    public void testCancelCoGroupTaskWhileSorting2() {
        int keyCnt = 10;
        int valCnt = 2;
        this.setOutput(this.output);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        final CoGroupDriver testTask = new CoGroupDriver();
        try {
            this.addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
            this.addInputSorted(new DelayingInfinitiveInputIterator(1000), this.comparator2.duplicate());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"The test caused an exception.");
        }
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    CoGroupTaskTest.this.testDriver((Driver)testTask, CoGroupTaskExternalITCase.MockCoGroupStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assert.fail((String)"Joining threads failed");
        }
        Assert.assertTrue((String)"Test threw an exception even though it was properly canceled.", (boolean)success.get());
    }

    @Test
    public void testCancelCoGroupTaskWhileCoGrouping() {
        int keyCnt = 100;
        int valCnt = 5;
        this.setOutput(this.output);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.CO_GROUP);
        final CoGroupDriver testTask = new CoGroupDriver();
        try {
            this.addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
            this.addInput(new UniformRecordGenerator(keyCnt, valCnt, true));
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"The test caused an exception.");
        }
        final AtomicBoolean success = new AtomicBoolean(false);
        Thread taskRunner = new Thread(){

            @Override
            public void run() {
                try {
                    CoGroupTaskTest.this.testDriver((Driver)testTask, MockDelayingCoGroupStub.class);
                    success.set(true);
                }
                catch (Exception ie) {
                    ie.printStackTrace();
                }
            }
        };
        taskRunner.start();
        TaskCancelThread tct = new TaskCancelThread(1, taskRunner, this);
        tct.start();
        try {
            tct.join();
            taskRunner.join();
        }
        catch (InterruptedException ie) {
            Assert.fail((String)"Joining threads failed");
        }
        Assert.assertTrue((String)"Test threw an exception even though it was properly canceled.", (boolean)success.get());
    }

    public static final class MockDelayingCoGroupStub
    extends RichCoGroupFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1L;

        public void coGroup(Iterable<Record> records1, Iterable<Record> records2, Collector<Record> out) {
            for (Record r : records1) {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {}
            }
            for (Record r : records2) {
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }
    }

    public static class MockFailingCoGroupStub
    extends RichCoGroupFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1L;
        private int cnt = 0;

        public void coGroup(Iterable<Record> records1, Iterable<Record> records2, Collector<Record> out) {
            int val1Cnt = 0;
            for (Record r : records1) {
                ++val1Cnt;
            }
            for (Record record2 : records2) {
                if (val1Cnt == 0) {
                    if (++this.cnt >= 10) {
                        throw new ExpectedTestException();
                    }
                    out.collect((Object)record2);
                    continue;
                }
                for (int i = 0; i < val1Cnt; ++i) {
                    if (++this.cnt >= 10) {
                        throw new ExpectedTestException();
                    }
                    out.collect((Object)record2);
                }
            }
        }
    }
}

