/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.typeutils.GenericPairComparator;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypePairComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.operators.sort.ReusingSortMergeCoGroupIterator;
import org.apache.flink.runtime.operators.testutils.TestData;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ReusingSortMergeCoGroupIteratorITCase {
    private static final int INPUT_1_SIZE = 20000;
    private static final int INPUT_2_SIZE = 1000;
    private static final long SEED1 = 561349061987311L;
    private static final long SEED2 = 231434613412342L;
    private TestData.TupleGenerator generator1;
    private TestData.TupleGenerator generator2;
    private MutableObjectIterator<Tuple2<Integer, String>> reader1;
    private MutableObjectIterator<Tuple2<Integer, String>> reader2;
    private TypeSerializer<Tuple2<Integer, String>> serializer1;
    private TypeSerializer<Tuple2<Integer, String>> serializer2;
    private TypeComparator<Tuple2<Integer, String>> comparator1;
    private TypeComparator<Tuple2<Integer, String>> comparator2;
    private TypePairComparator<Tuple2<Integer, String>, Tuple2<Integer, String>> pairComparator;

    @Before
    public void beforeTest() {
        this.serializer1 = TestData.getIntStringTupleSerializer();
        this.serializer2 = TestData.getIntStringTupleSerializer();
        this.comparator1 = TestData.getIntStringTupleComparator();
        this.comparator2 = TestData.getIntStringTupleComparator();
        this.pairComparator = new GenericPairComparator(this.comparator1, this.comparator2);
    }

    @Test
    public void testMerge() {
        try {
            this.generator1 = new TestData.TupleGenerator(561349061987311L, 500, 4096, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
            this.generator2 = new TestData.TupleGenerator(231434613412342L, 500, 2048, TestData.TupleGenerator.KeyMode.SORTED, TestData.TupleGenerator.ValueMode.RANDOM_LENGTH);
            this.reader1 = new TestData.TupleGeneratorIterator(this.generator1, 20000);
            this.reader2 = new TestData.TupleGeneratorIterator(this.generator2, 1000);
            Map<Integer, Collection<String>> expectedStringsMap1 = this.collectData(this.generator1, 20000);
            Map<Integer, Collection<String>> expectedStringsMap2 = this.collectData(this.generator2, 1000);
            Map<Integer, List<Collection<String>>> expectedCoGroupsMap = this.coGroupValues(expectedStringsMap1, expectedStringsMap2);
            this.generator1.reset();
            this.generator2.reset();
            ReusingSortMergeCoGroupIterator iterator = new ReusingSortMergeCoGroupIterator(this.reader1, this.reader2, this.serializer1, this.comparator1, this.serializer2, this.comparator2, this.pairComparator);
            iterator.open();
            int key = 0;
            while (iterator.next()) {
                Tuple2 rec;
                Tuple2 rec2;
                Iterator iter1 = iterator.getValues1().iterator();
                Iterator iter2 = iterator.getValues2().iterator();
                String v1 = null;
                String v2 = null;
                if (iter1.hasNext()) {
                    rec2 = (Tuple2)iter1.next();
                    key = (Integer)rec2.f0;
                    v1 = (String)rec2.f1;
                } else if (iter2.hasNext()) {
                    rec2 = (Tuple2)iter2.next();
                    key = (Integer)rec2.f0;
                    v2 = (String)rec2.f1;
                } else {
                    Assert.fail((String)"No input on both sides.");
                }
                Assert.assertTrue((String)("No matches for key " + key), (boolean)expectedCoGroupsMap.containsKey(key));
                Collection<String> expValues1 = expectedCoGroupsMap.get(key).get(0);
                Collection<String> expValues2 = expectedCoGroupsMap.get(key).get(1);
                if (v1 != null) {
                    expValues1.remove(v1);
                } else {
                    expValues2.remove(v2);
                }
                while (iter1.hasNext()) {
                    rec = (Tuple2)iter1.next();
                    Assert.assertTrue((String)"String not in expected set of first input", (boolean)expValues1.remove(rec.f1));
                }
                Assert.assertTrue((String)"Expected set of first input not empty", (boolean)expValues1.isEmpty());
                while (iter2.hasNext()) {
                    rec = (Tuple2)iter2.next();
                    Assert.assertTrue((String)"String not in expected set of second input", (boolean)expValues2.remove(rec.f1));
                }
                Assert.assertTrue((String)"Expected set of second input not empty", (boolean)expValues2.isEmpty());
                expectedCoGroupsMap.remove(key);
            }
            iterator.close();
            Assert.assertTrue((String)"Expected key set not empty", (boolean)expectedCoGroupsMap.isEmpty());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("An exception occurred during the test: " + e.getMessage()));
        }
    }

    private Map<Integer, List<Collection<String>>> coGroupValues(Map<Integer, Collection<String>> leftMap, Map<Integer, Collection<String>> rightMap) {
        HashMap<Integer, List<Collection<String>>> map = new HashMap<Integer, List<Collection<String>>>(1000);
        HashSet<Integer> keySet = new HashSet<Integer>(leftMap.keySet());
        keySet.addAll(rightMap.keySet());
        for (Integer key : keySet) {
            Collection<String> leftValues = leftMap.get(key);
            Collection<String> rightValues = rightMap.get(key);
            ArrayList<Collection<Object>> list = new ArrayList<Collection<Object>>(2);
            if (leftValues == null) {
                list.add(new ArrayList(0));
            } else {
                list.add(leftValues);
            }
            if (rightValues == null) {
                list.add(new ArrayList(0));
            } else {
                list.add(rightValues);
            }
            map.put(key, list);
        }
        return map;
    }

    private Map<Integer, Collection<String>> collectData(TestData.TupleGenerator iter, int num) throws Exception {
        HashMap<Integer, Collection<String>> map = new HashMap<Integer, Collection<String>>();
        Tuple2 pair = new Tuple2();
        for (int i = 0; i < num; ++i) {
            iter.next((Tuple2<Integer, String>)pair);
            int key = (Integer)pair.f0;
            if (!map.containsKey(key)) {
                map.put(key, new ArrayList());
            }
            Collection values = (Collection)map.get(key);
            values.add(pair.f1);
        }
        return map;
    }
}

