/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.join;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntComparator;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.TupleComparator;
import org.apache.flink.core.memory.ManagedMemoryUseCase;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.sort.MergeIterator;
import org.apache.flink.runtime.operators.testutils.Match;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.TwoInputStreamTaskTestHarness;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.runtime.operators.join.FlinkJoinType;
import org.apache.flink.table.runtime.operators.join.Int2SortMergeJoinOperatorTest;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.util.MutableObjectIterator;
import org.assertj.core.api.Assertions;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class RandomSortMergeInnerJoinTest {
    private static final long SEED1 = 561349061987311L;
    private static final long SEED2 = 231434613412342L;
    private static final int INPUT_FIRST_SIZE = 20000;
    private static final int INPUT_SECOND_SIZE = 1000;
    private boolean leftIsSmall;
    private TypeComparator<Tuple2<Integer, String>> comparator1;
    private TypeComparator<Tuple2<Integer, String>> comparator2;

    public RandomSortMergeInnerJoinTest(boolean leftIsSmall) {
        this.leftIsSmall = leftIsSmall;
    }

    @Parameterized.Parameters
    public static Collection<Boolean> parameters() {
        return Arrays.asList(true, false);
    }

    @Before
    public void before() {
        this.comparator1 = new TupleComparator(new int[]{0}, new TypeComparator[]{new IntComparator(true)}, new TypeSerializer[]{IntSerializer.INSTANCE});
        this.comparator2 = new TupleComparator(new int[]{0}, new TypeComparator[]{new IntComparator(true)}, new TypeSerializer[]{IntSerializer.INSTANCE});
    }

    @Test
    public void test() throws Exception {
        TestData.TupleGenerator generator1 = new TestData.TupleGenerator(561349061987311L, 500, 4096, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        TestData.TupleGenerator generator2 = new TestData.TupleGenerator(231434613412342L, 500, 2048, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, 20000);
        TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, 1000);
        Map<Integer, Collection<Match>> expectedMatchesMap = RandomSortMergeInnerJoinTest.matchValues(RandomSortMergeInnerJoinTest.collectData((MutableObjectIterator<Tuple2<Integer, String>>)input1), RandomSortMergeInnerJoinTest.collectData((MutableObjectIterator<Tuple2<Integer, String>>)input2));
        generator1.reset();
        generator2.reset();
        input1.reset();
        input2.reset();
        StreamOperator operator = this.getOperator();
        RandomSortMergeInnerJoinTest.match(expectedMatchesMap, RandomSortMergeInnerJoinTest.transformToBinary(RandomSortMergeInnerJoinTest.join(operator, (MutableObjectIterator<Tuple2<Integer, String>>)input1, (MutableObjectIterator<Tuple2<Integer, String>>)input2)));
        Assertions.assertThat(expectedMatchesMap).allSatisfy((i, e) -> Assertions.assertThat((Collection)e).isEmpty());
    }

    @Test
    public void testMergeWithHighNumberOfCommonKeys() throws Exception {
        int input1Size = 200;
        int input2Size = 100;
        int input1Duplicates = 10;
        int input2Duplicates = 4000;
        int duplicateKey = 13;
        TestData.TupleGenerator generator1 = new TestData.TupleGenerator(561349061987311L, 500, 4096, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        TestData.TupleGenerator generator2 = new TestData.TupleGenerator(231434613412342L, 500, 2048, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
        TestData.TupleGeneratorIterator gen1Iter = new TestData.TupleGeneratorIterator(generator1, 200);
        TestData.TupleGeneratorIterator gen2Iter = new TestData.TupleGeneratorIterator(generator2, 100);
        TestData.TupleConstantValueIterator const1Iter = new TestData.TupleConstantValueIterator(13, "LEFT String for Duplicate Keys", 10);
        TestData.TupleConstantValueIterator const2Iter = new TestData.TupleConstantValueIterator(13, "RIGHT String for Duplicate Keys", 4000);
        ArrayList<Object> inList1 = new ArrayList<Object>();
        inList1.add(gen1Iter);
        inList1.add(const1Iter);
        ArrayList<Object> inList2 = new ArrayList<Object>();
        inList2.add(gen2Iter);
        inList2.add(const2Iter);
        MergeIterator input1 = new MergeIterator(inList1, this.comparator1.duplicate());
        MergeIterator input2 = new MergeIterator(inList2, this.comparator2.duplicate());
        Map<Integer, Collection<Match>> expectedMatchesMap = RandomSortMergeInnerJoinTest.matchValues(RandomSortMergeInnerJoinTest.collectData((MutableObjectIterator<Tuple2<Integer, String>>)input1), RandomSortMergeInnerJoinTest.collectData((MutableObjectIterator<Tuple2<Integer, String>>)input2));
        generator1.reset();
        generator2.reset();
        const1Iter.reset();
        const2Iter.reset();
        gen1Iter.reset();
        gen2Iter.reset();
        inList1.clear();
        inList1.add(gen1Iter);
        inList1.add(const1Iter);
        inList2.clear();
        inList2.add(gen2Iter);
        inList2.add(const2Iter);
        input1 = new MergeIterator(inList1, this.comparator1.duplicate());
        input2 = new MergeIterator(inList2, this.comparator2.duplicate());
        StreamOperator operator = this.getOperator();
        RandomSortMergeInnerJoinTest.match(expectedMatchesMap, RandomSortMergeInnerJoinTest.transformToBinary(RandomSortMergeInnerJoinTest.join(operator, (MutableObjectIterator<Tuple2<Integer, String>>)input1, (MutableObjectIterator<Tuple2<Integer, String>>)input2)));
        Assertions.assertThat(expectedMatchesMap).allSatisfy((i, e) -> Assertions.assertThat((Collection)e).isEmpty());
    }

    public static void match(Map<Integer, Collection<Match>> expectedMatchesMap, LinkedBlockingQueue<Object> values) {
        for (Object o : values) {
            boolean contained;
            BinaryRowData row = (BinaryRowData)((StreamRecord)o).getValue();
            Integer key = row.getInt(0);
            String value1 = row.isNullAt(1) ? null : row.getString(1).toString();
            String value2 = row.isNullAt(2) ? null : row.getString(2).toString();
            Collection<Match> matches = expectedMatchesMap.get(key);
            if (matches == null) {
                Assertions.fail((String)("Match " + key + " - " + value1 + ":" + value2 + " is unexpected."));
            }
            if (!(contained = matches.remove(new Match(value1, value2)))) {
                Assertions.fail((String)("Produced match was not contained: " + key + " - " + value1 + ":" + value2 + ", now have: " + matches));
            }
            if (!matches.isEmpty()) continue;
            expectedMatchesMap.remove(key);
        }
    }

    public static LinkedBlockingQueue<Object> join(StreamOperator operator, MutableObjectIterator<Tuple2<Integer, String>> input1, MutableObjectIterator<Tuple2<Integer, String>> input2) throws Exception {
        return RandomSortMergeInnerJoinTest.join(operator, input1, input2, true);
    }

    public static LinkedBlockingQueue<Object> join(StreamOperator operator, MutableObjectIterator<Tuple2<Integer, String>> input1, MutableObjectIterator<Tuple2<Integer, String>> input2, boolean input1First) throws Exception {
        InternalTypeInfo typeInfo = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new IntType(), VarCharType.STRING_TYPE});
        InternalTypeInfo joinedInfo = InternalTypeInfo.ofFields((LogicalType[])new LogicalType[]{new IntType(), VarCharType.STRING_TYPE, new IntType(), VarCharType.STRING_TYPE});
        TwoInputStreamTaskTestHarness testHarness = new TwoInputStreamTaskTestHarness(TwoInputStreamTask::new, 2, 1, new int[]{1, 2}, (TypeInformation)typeInfo, (TypeInformation)typeInfo, (TypeInformation)joinedInfo);
        testHarness.bufferSize = 10240;
        testHarness.getExecutionConfig().enableObjectReuse();
        testHarness.memorySize = 0x2400000L;
        testHarness.setupOutputForSingletonOperatorChain();
        testHarness.getStreamConfig().setStreamOperator(operator);
        testHarness.getStreamConfig().setOperatorID(new OperatorID());
        testHarness.getStreamConfig().setManagedMemoryFractionOperatorOfUseCase(ManagedMemoryUseCase.OPERATOR, 0.99);
        long initialTime = 0L;
        testHarness.invoke();
        testHarness.waitForTaskRunning();
        if (input1First) {
            Tuple2 tuple2 = new Tuple2();
            while ((tuple2 = (Tuple2)input1.next((Object)tuple2)) != null) {
                testHarness.processElement((Object)new StreamRecord((Object)RandomSortMergeInnerJoinTest.newRow((Integer)tuple2.f0, (String)tuple2.f1), initialTime), 0, 0);
            }
            testHarness.waitForInputProcessing();
            tuple2 = new Tuple2();
            while ((tuple2 = (Tuple2)input2.next((Object)tuple2)) != null) {
                testHarness.processElement((Object)new StreamRecord((Object)RandomSortMergeInnerJoinTest.newRow((Integer)tuple2.f0, (String)tuple2.f1), initialTime), 1, 0);
            }
            testHarness.waitForInputProcessing();
        } else {
            Tuple2 tuple2 = new Tuple2();
            while ((tuple2 = (Tuple2)input2.next((Object)tuple2)) != null) {
                testHarness.processElement((Object)new StreamRecord((Object)RandomSortMergeInnerJoinTest.newRow((Integer)tuple2.f0, (String)tuple2.f1), initialTime), 1, 0);
            }
            testHarness.waitForInputProcessing();
            tuple2 = new Tuple2();
            while ((tuple2 = (Tuple2)input1.next((Object)tuple2)) != null) {
                testHarness.processElement((Object)new StreamRecord((Object)RandomSortMergeInnerJoinTest.newRow((Integer)tuple2.f0, (String)tuple2.f1), initialTime), 0, 0);
            }
            testHarness.waitForInputProcessing();
        }
        testHarness.endInput();
        testHarness.waitForTaskCompletion();
        return testHarness.getOutput();
    }

    public static BinaryRowData newRow(int i, String s1) {
        BinaryRowData row = new BinaryRowData(2);
        BinaryRowWriter writer = new BinaryRowWriter(row);
        writer.writeInt(0, i);
        writer.writeString(1, StringData.fromString((String)s1));
        writer.complete();
        return row;
    }

    public static Map<Integer, Collection<Match>> matchValues(Map<Integer, Collection<String>> leftMap, Map<Integer, Collection<String>> rightMap) {
        HashMap<Integer, Collection<Match>> map = new HashMap<Integer, Collection<Match>>();
        for (Integer key : leftMap.keySet()) {
            Collection<String> leftValues = leftMap.get(key);
            Collection<String> rightValues = rightMap.get(key);
            if (rightValues == null) continue;
            if (!map.containsKey(key)) {
                map.put(key, new ArrayList());
            }
            Collection matchedValues = (Collection)map.get(key);
            for (String leftValue : leftValues) {
                for (String rightValue : rightValues) {
                    matchedValues.add(new Match(leftValue, rightValue));
                }
            }
        }
        return map;
    }

    public static Map<Integer, Collection<String>> collectData(MutableObjectIterator<Tuple2<Integer, String>> iter) throws Exception {
        HashMap<Integer, Collection<String>> map = new HashMap<Integer, Collection<String>>();
        Tuple2 pair = new Tuple2();
        while ((pair = (Tuple2)iter.next((Object)pair)) != null) {
            Integer key = (Integer)pair.getField(0);
            if (!map.containsKey(key)) {
                map.put(key, new ArrayList());
            }
            Collection values = (Collection)map.get(key);
            String value = (String)pair.getField(1);
            values.add(value);
        }
        return map;
    }

    private StreamOperator getOperator() {
        return Int2SortMergeJoinOperatorTest.newOperator(FlinkJoinType.INNER, this.leftIsSmall);
    }

    public static LinkedBlockingQueue<Object> transformToBinary(LinkedBlockingQueue<Object> output) {
        LinkedBlockingQueue<Object> ret = new LinkedBlockingQueue<Object>();
        for (Object o : output) {
            BinaryRowData binaryRow;
            RowData row = (RowData)((StreamRecord)o).getValue();
            if (row.isNullAt(0)) {
                binaryRow = RandomSortMergeInnerJoinTest.newRow(row.getInt(2), null, row.getString(3).toString());
            } else if (row.isNullAt(2)) {
                binaryRow = RandomSortMergeInnerJoinTest.newRow(row.getInt(0), row.getString(1).toString(), null);
            } else {
                String value1 = row.getString(1).toString();
                String value2 = row.getString(3).toString();
                binaryRow = RandomSortMergeInnerJoinTest.newRow(row.getInt(0), value1, value2);
            }
            ret.add(new StreamRecord((Object)binaryRow));
        }
        return ret;
    }

    public static BinaryRowData newRow(int i, String s1, String s2) {
        BinaryRowData row = new BinaryRowData(3);
        BinaryRowWriter writer = new BinaryRowWriter(row);
        writer.writeInt(0, i);
        if (s1 == null) {
            writer.setNullAt(1);
        } else {
            writer.writeString(1, StringData.fromString((String)s1));
        }
        if (s2 == null) {
            writer.setNullAt(2);
        } else {
            writer.writeString(2, StringData.fromString((String)s2));
        }
        writer.complete();
        return row;
    }
}

