/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
import org.apache.flink.api.common.typeutils.record.RecordComparator;
import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.JoinDriver;
import org.apache.flink.runtime.operators.testutils.DriverTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;

public class JoinTaskExternalITCase
extends DriverTestBase<FlatJoinFunction<Record, Record, Record>> {
    private static final long HASH_MEM = 0x400000L;
    private static final long SORT_MEM = 0x300000L;
    private static final long BNLJN_MEM = 327680L;
    private final double bnljn_frac;
    private final double hash_frac;
    private final RecordComparator comparator1 = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
    private final RecordComparator comparator2 = new RecordComparator(new int[]{0}, new Class[]{IntValue.class});
    private final DriverTestBase.CountingOutputCollector output = new DriverTestBase.CountingOutputCollector();

    public JoinTaskExternalITCase(ExecutionConfig config) {
        super(config, 0x400000L, 2, 0x300000L);
        this.bnljn_frac = 327680.0 / (double)this.getMemoryManager().getMemorySize();
        this.hash_frac = 4194304.0 / (double)this.getMemoryManager().getMemorySize();
    }

    @Test
    public void testExternalSort1MatchTask() {
        int keyCnt1 = 65536;
        int valCnt1 = 2;
        int keyCnt2 = 8192;
        int valCnt2 = 8;
        int expCnt = 16 * Math.min(65536, 8192);
        this.setOutput(this.output);
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.getTaskConfig().setDriverStrategy(DriverStrategy.INNER_MERGE);
        this.getTaskConfig().setRelativeMemoryDriver(this.bnljn_frac);
        this.setNumFileHandlesForSort(4);
        JoinDriver testTask = new JoinDriver();
        try {
            this.addInputSorted(new UniformRecordGenerator(65536, 2, false), this.comparator1.duplicate());
            this.addInputSorted(new UniformRecordGenerator(8192, 8, false), this.comparator2.duplicate());
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"The test caused an exception.");
        }
        Assert.assertEquals((String)"Wrong result set size.", (long)expCnt, (long)this.output.getNumberOfRecords());
    }

    @Test
    public void testExternalHash1MatchTask() {
        int keyCnt1 = 32768;
        int valCnt1 = 8;
        int keyCnt2 = 65536;
        int valCnt2 = 8;
        int expCnt = 64 * Math.min(32768, 65536);
        this.addInput(new UniformRecordGenerator(32768, 8, false));
        this.addInput(new UniformRecordGenerator(65536, 8, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(this.output);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_FIRST);
        this.getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        JoinDriver testTask = new JoinDriver();
        try {
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"Test caused an exception.");
        }
        Assert.assertEquals((String)"Wrong result set size.", (long)expCnt, (long)this.output.getNumberOfRecords());
    }

    @Test
    public void testExternalHash2MatchTask() {
        int keyCnt1 = 32768;
        int valCnt1 = 8;
        int keyCnt2 = 65536;
        int valCnt2 = 8;
        int expCnt = 64 * Math.min(32768, 65536);
        this.addInput(new UniformRecordGenerator(32768, 8, false));
        this.addInput(new UniformRecordGenerator(65536, 8, false));
        this.addDriverComparator(this.comparator1);
        this.addDriverComparator(this.comparator2);
        this.getTaskConfig().setDriverPairComparator((TypePairComparatorFactory)RecordPairComparatorFactory.get());
        this.setOutput(this.output);
        this.getTaskConfig().setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
        this.getTaskConfig().setRelativeMemoryDriver(this.hash_frac);
        JoinDriver testTask = new JoinDriver();
        try {
            this.testDriver((Driver)testTask, MockMatchStub.class);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)"Test caused an exception.");
        }
        Assert.assertEquals((String)"Wrong result set size.", (long)expCnt, (long)this.output.getNumberOfRecords());
    }

    public static final class MockMatchStub
    implements FlatJoinFunction<Record, Record, Record> {
        private static final long serialVersionUID = 1L;

        public void join(Record value1, Record value2, Collector<Record> out) throws Exception {
            out.collect((Object)value1);
        }
    }
}

