/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.operators;

import java.util.List;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.RichCrossFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.CrossOperator;
import org.apache.flink.api.java.operators.TwoInputUdfOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class CrossITCase
extends MultipleProgramsTestBase {
    public CrossITCase(MultipleProgramsTestBase.TestExecutionMode mode) {
        super(mode);
    }

    @Test
    public void testCorretnessOfCrossOnTwoTupleInputs() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
        CrossOperator crossDs = ds.cross(ds2).with((CrossFunction)new Tuple5Cross());
        List result = crossDs.collect();
        String expected = "0,HalloHallo\n1,HalloHallo Welt\n2,HalloHallo Welt wie\n1,Hallo WeltHallo\n2,Hallo WeltHallo Welt\n3,Hallo WeltHallo Welt wie\n2,Hallo Welt wieHallo\n3,Hallo Welt wieHallo Welt\n4,Hallo Welt wieHallo Welt wie\n";
        CrossITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfCrossIfUDFReturnsLeftInputObject() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
        CrossOperator crossDs = ds.cross(ds2).with((CrossFunction)new Tuple3ReturnLeft());
        List result = crossDs.collect();
        String expected = "1,1,Hi\n1,1,Hi\n1,1,Hi\n2,2,Hello\n2,2,Hello\n2,2,Hello\n3,2,Hello world\n3,2,Hello world\n3,2,Hello world\n";
        CrossITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfCrossIfUDFReturnsRightInputObject() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
        CrossOperator crossDs = ds.cross(ds2).with((CrossFunction)new Tuple5ReturnRight());
        List result = crossDs.collect();
        String expected = "1,1,0,Hallo,1\n1,1,0,Hallo,1\n1,1,0,Hallo,1\n2,2,1,Hallo Welt,2\n2,2,1,Hallo Welt,2\n2,2,1,Hallo Welt,2\n2,3,2,Hallo Welt wie,1\n2,3,2,Hallo Welt wie,1\n2,3,2,Hallo Welt wie,1\n";
        CrossITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfCrossWithBroadcastSet() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Integer> intDs = CollectionDataSets.getIntegerDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
        TwoInputUdfOperator crossDs = ds.cross(ds2).with((CrossFunction)new Tuple5CrossBC()).withBroadcastSet(intDs, "ints");
        List result = crossDs.collect();
        String expected = "2,0,55\n3,0,55\n3,0,55\n3,0,55\n4,1,55\n4,2,55\n3,0,55\n4,2,55\n4,4,55\n";
        CrossITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfCrossWithHuge() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
        CrossOperator crossDs = ds.crossWithHuge(ds2).with((CrossFunction)new Tuple5Cross());
        List result = crossDs.collect();
        String expected = "0,HalloHallo\n1,HalloHallo Welt\n2,HalloHallo Welt wie\n1,Hallo WeltHallo\n2,Hallo WeltHallo Welt\n3,Hallo WeltHallo Welt wie\n2,Hallo Welt wieHallo\n3,Hallo Welt wieHallo Welt\n4,Hallo Welt wieHallo Welt wie\n";
        CrossITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfCrossWithTiny() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
        CrossOperator crossDs = ds.crossWithTiny(ds2).with((CrossFunction)new Tuple5Cross());
        List result = crossDs.collect();
        String expected = "0,HalloHallo\n1,HalloHallo Welt\n2,HalloHallo Welt wie\n1,Hallo WeltHallo\n2,Hallo WeltHallo Welt\n3,Hallo WeltHallo Welt wie\n2,Hallo Welt wieHallo\n3,Hallo Welt wieHallo Welt\n4,Hallo Welt wieHallo Welt wie\n";
        CrossITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testProjectCrossOnATupleInput1() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
        CrossOperator.ProjectCross crossDs = ds.cross(ds2).projectFirst(new int[]{2, 1}).projectSecond(new int[]{3}).projectFirst(new int[]{0}).projectSecond(new int[]{4, 1});
        List result = crossDs.collect();
        String expected = "Hi,1,Hallo,1,1,1\nHi,1,Hallo Welt,1,2,2\nHi,1,Hallo Welt wie,1,1,3\nHello,2,Hallo,2,1,1\nHello,2,Hallo Welt,2,2,2\nHello,2,Hallo Welt wie,2,1,3\nHello world,2,Hallo,3,1,1\nHello world,2,Hallo Welt,3,2,2\nHello world,2,Hallo Welt wie,3,1,3\n";
        CrossITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testProjectCrossOnATupleInput2() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
        CrossOperator.ProjectCross crossDs = ds.cross(ds2).projectSecond(new int[]{3}).projectFirst(new int[]{2, 1}).projectSecond(new int[]{4, 1}).projectFirst(new int[]{0});
        List result = crossDs.collect();
        String expected = "Hallo,Hi,1,1,1,1\nHallo Welt,Hi,1,2,2,1\nHallo Welt wie,Hi,1,1,3,1\nHallo,Hello,2,1,1,2\nHallo Welt,Hello,2,2,2,2\nHallo Welt wie,Hello,2,1,3,2\nHallo,Hello world,2,1,1,3\nHallo Welt,Hello world,2,2,2,3\nHallo Welt wie,Hello world,2,1,3,3\n";
        CrossITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfDefaultCross() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.getSmall3TupleDataSet(env);
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds2 = CollectionDataSets.getSmall5TupleDataSet(env);
        CrossOperator.DefaultCross crossDs = ds.cross(ds2);
        List result = crossDs.collect();
        String expected = "(1,1,Hi),(2,2,1,Hallo Welt,2)\n(1,1,Hi),(1,1,0,Hallo,1)\n(1,1,Hi),(2,3,2,Hallo Welt wie,1)\n(2,2,Hello),(2,2,1,Hallo Welt,2)\n(2,2,Hello),(1,1,0,Hallo,1)\n(2,2,Hello),(2,3,2,Hallo Welt wie,1)\n(3,2,Hello world),(2,2,1,Hallo Welt,2)\n(3,2,Hello world),(1,1,0,Hallo,1)\n(3,2,Hello world),(2,3,2,Hallo Welt wie,1)\n";
        CrossITCase.compareResultAsTuples((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfCrossOnTwoCustomTypeInputs() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<CollectionDataSets.CustomType> ds = CollectionDataSets.getSmallCustomTypeDataSet(env);
        DataSet<CollectionDataSets.CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
        CrossOperator crossDs = ds.cross(ds2).with((CrossFunction)new CustomTypeCross());
        List result = crossDs.collect();
        String expected = "1,0,HiHi\n2,1,HiHello\n2,2,HiHello world\n2,1,HelloHi\n4,2,HelloHello\n4,3,HelloHello world\n2,2,Hello worldHi\n4,3,Hello worldHello\n4,4,Hello worldHello world";
        CrossITCase.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testCorrectnessOfCrossATupleInputAndACustomTypeInput() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple5<Integer, Long, Integer, String, Long>> ds = CollectionDataSets.getSmall5TupleDataSet(env);
        DataSet<CollectionDataSets.CustomType> ds2 = CollectionDataSets.getSmallCustomTypeDataSet(env);
        CrossOperator crossDs = ds.cross(ds2).with((CrossFunction)new MixedCross());
        List result = crossDs.collect();
        String expected = "2,0,HalloHi\n3,0,HalloHello\n3,0,HalloHello world\n3,0,Hallo WeltHi\n4,1,Hallo WeltHello\n4,2,Hallo WeltHello world\n3,0,Hallo Welt wieHi\n4,2,Hallo Welt wieHello\n4,4,Hallo Welt wieHello world\n";
        CrossITCase.compareResultAsTuples((List)result, (String)expected);
    }

    private static class Tuple5CrossBC
    extends RichCrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Integer, Integer>> {
        private static final long serialVersionUID = 1L;
        private int broadcast = 42;

        private Tuple5CrossBC() {
        }

        public void open(Configuration config) {
            List ints = this.getRuntimeContext().getBroadcastVariable("ints");
            int sum = 0;
            for (Integer i : ints) {
                sum += i.intValue();
            }
            this.broadcast = sum;
        }

        public Tuple3<Integer, Integer, Integer> cross(Tuple5<Integer, Long, Integer, String, Long> first, Tuple5<Integer, Long, Integer, String, Long> second) throws Exception {
            return new Tuple3((Object)((Integer)first.f0 + (Integer)second.f0), (Object)((Integer)first.f2 * (Integer)second.f2), (Object)this.broadcast);
        }
    }

    private static class Tuple5ReturnRight
    implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>> {
        private static final long serialVersionUID = 1L;

        private Tuple5ReturnRight() {
        }

        public Tuple5<Integer, Long, Integer, String, Long> cross(Tuple3<Integer, Long, String> first, Tuple5<Integer, Long, Integer, String, Long> second) throws Exception {
            return second;
        }
    }

    private static class Tuple3ReturnLeft
    implements CrossFunction<Tuple3<Integer, Long, String>, Tuple5<Integer, Long, Integer, String, Long>, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        private Tuple3ReturnLeft() {
        }

        public Tuple3<Integer, Long, String> cross(Tuple3<Integer, Long, String> first, Tuple5<Integer, Long, Integer, String, Long> second) throws Exception {
            return first;
        }
    }

    private static class MixedCross
    implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, CollectionDataSets.CustomType, Tuple3<Integer, Long, String>> {
        private static final long serialVersionUID = 1L;

        private MixedCross() {
        }

        public Tuple3<Integer, Long, String> cross(Tuple5<Integer, Long, Integer, String, Long> first, CollectionDataSets.CustomType second) throws Exception {
            return new Tuple3((Object)((Integer)first.f0 + second.myInt), (Object)((long)((Integer)first.f2).intValue() * second.myLong), (Object)((String)first.f3 + second.myString));
        }
    }

    private static class CustomTypeCross
    implements CrossFunction<CollectionDataSets.CustomType, CollectionDataSets.CustomType, CollectionDataSets.CustomType> {
        private static final long serialVersionUID = 1L;

        private CustomTypeCross() {
        }

        public CollectionDataSets.CustomType cross(CollectionDataSets.CustomType first, CollectionDataSets.CustomType second) throws Exception {
            return new CollectionDataSets.CustomType(first.myInt * second.myInt, first.myLong + second.myLong, first.myString + second.myString);
        }
    }

    private static class Tuple5Cross
    implements CrossFunction<Tuple5<Integer, Long, Integer, String, Long>, Tuple5<Integer, Long, Integer, String, Long>, Tuple2<Integer, String>> {
        private static final long serialVersionUID = 1L;

        private Tuple5Cross() {
        }

        public Tuple2<Integer, String> cross(Tuple5<Integer, Long, Integer, String, Long> first, Tuple5<Integer, Long, Integer, String, Long> second) throws Exception {
            return new Tuple2((Object)((Integer)first.f2 + (Integer)second.f2), (Object)((String)first.f3 + (String)second.f3));
        }
    }
}

