/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.operators;

import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.operators.MapOperator;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.operators.PartitionOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.test.operators.util.CollectionDataSets;
import org.apache.flink.test.util.MultipleProgramsTestBaseJUnit4;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class PartitionITCase
extends MultipleProgramsTestBaseJUnit4 {
    public PartitionITCase(MultipleProgramsTestBaseJUnit4.TestExecutionMode mode) {
        super(mode);
    }

    @Test
    public void testHashPartitionByKeyField() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        MapPartitionOperator uniqLongs = ds.partitionByHash(new int[]{1}).mapPartition((MapPartitionFunction)new UniqueTupleLongMapper());
        List result = uniqLongs.collect();
        String expected = "1\n2\n3\n4\n5\n6\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testRangePartitionByKeyField() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        MapPartitionOperator uniqLongs = ds.partitionByRange(new int[]{1}).mapPartition((MapPartitionFunction)new UniqueTupleLongMapper());
        List result = uniqLongs.collect();
        String expected = "1\n2\n3\n4\n5\n6\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testHashPartitionByKeyField2() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        AggregateOperator sum = ds.map((MapFunction)new PrefixMapper()).partitionByHash(new int[]{1, 2}).groupBy(new int[]{1, 2}).sum(0);
        List result = sum.collect();
        String expected = "(1,1,Hi)\n(5,2,Hello)\n(4,3,Hello)\n(5,3,I am )\n(6,3,Luke )\n(34,4,Comme)\n(65,5,Comme)\n(111,6,Comme)";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testRangePartitionByKeyField2() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        AggregateOperator sum = ds.map((MapFunction)new PrefixMapper()).partitionByRange(new int[]{1, 2}).groupBy(new int[]{1, 2}).sum(0);
        List result = sum.collect();
        String expected = "(1,1,Hi)\n(5,2,Hello)\n(4,3,Hello)\n(5,3,I am )\n(6,3,Luke )\n(34,4,Comme)\n(65,5,Comme)\n(111,6,Comme)";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testHashPartitionOfAtomicType() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        MapPartitionOperator uniqLongs = env.generateSequence(1L, 6L).union((DataSet)env.generateSequence(1L, 6L)).rebalance().partitionByHash(new String[]{"*"}).mapPartition((MapPartitionFunction)new UniqueLongMapper());
        List result = uniqLongs.collect();
        String expected = "1\n2\n3\n4\n5\n6\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testRangePartitionOfAtomicType() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        MapPartitionOperator uniqLongs = env.generateSequence(1L, 6L).union((DataSet)env.generateSequence(1L, 6L)).rebalance().partitionByRange(new String[]{"*"}).mapPartition((MapPartitionFunction)new UniqueLongMapper());
        List result = uniqLongs.collect();
        String expected = "1\n2\n3\n4\n5\n6\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testHashPartitionByKeySelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        MapPartitionOperator uniqLongs = ds.partitionByHash((KeySelector)new KeySelector1()).mapPartition((MapPartitionFunction)new UniqueTupleLongMapper());
        List result = uniqLongs.collect();
        String expected = "1\n2\n3\n4\n5\n6\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testRangePartitionByKeySelector() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        MapPartitionOperator uniqLongs = ds.partitionByRange((KeySelector)new KeySelector1()).mapPartition((MapPartitionFunction)new UniqueTupleLongMapper());
        List result = uniqLongs.collect();
        String expected = "1\n2\n3\n4\n5\n6\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testForcedRebalancing() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource ds = env.generateSequence(1L, 3000L);
        MapOperator uniqLongs = ds.filter((FilterFunction)new Filter1()).rebalance().map((MapFunction)new PartitionIndexMapper()).groupBy(new int[]{0}).reduce((ReduceFunction)new Reducer1()).map((MapFunction)new Mapper1());
        List result = uniqLongs.collect();
        StringBuilder expected = new StringBuilder();
        int numPerPartition = 2220 / env.getParallelism() / 10;
        for (int i = 0; i < env.getParallelism(); ++i) {
            expected.append('(').append(i).append(',').append(numPerPartition).append(")\n");
        }
        TestBaseUtils.compareResultAsText((List)result, (String)expected.toString());
    }

    @Test
    public void testHashPartitionByKeyFieldAndDifferentParallelism() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        MapPartitionOperator uniqLongs = ((PartitionOperator)ds.partitionByHash(new int[]{1}).setParallelism(4)).mapPartition((MapPartitionFunction)new UniqueTupleLongMapper());
        List result = uniqLongs.collect();
        String expected = "1\n2\n3\n4\n5\n6\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testRangePartitionByKeyFieldAndDifferentParallelism() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        MapPartitionOperator uniqLongs = ((PartitionOperator)ds.partitionByRange(new int[]{1}).setParallelism(4)).mapPartition((MapPartitionFunction)new UniqueTupleLongMapper());
        List result = uniqLongs.collect();
        String expected = "1\n2\n3\n4\n5\n6\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testHashPartitionWithKeyExpression() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
        MapPartitionOperator uniqLongs = ((PartitionOperator)ds.partitionByHash(new String[]{"nestedPojo.longNumber"}).setParallelism(4)).mapPartition((MapPartitionFunction)new UniqueNestedPojoLongMapper());
        List result = uniqLongs.collect();
        String expected = "10000\n20000\n30000\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testRangePartitionWithKeyExpression() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(3);
        DataSet<CollectionDataSets.POJO> ds = CollectionDataSets.getDuplicatePojoDataSet(env);
        MapPartitionOperator uniqLongs = ((PartitionOperator)ds.partitionByRange(new String[]{"nestedPojo.longNumber"}).setParallelism(4)).mapPartition((MapPartitionFunction)new UniqueNestedPojoLongMapper());
        List result = uniqLongs.collect();
        String expected = "10000\n20000\n30000\n";
        TestBaseUtils.compareResultAsText((List)result, (String)expected);
    }

    @Test
    public void testRangePartitionerOnSequenceData() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource dataSource = env.generateSequence(0L, 10000L);
        ObjectSelfKeySelector keyExtractor = new ObjectSelfKeySelector();
        MinMaxSelector<Long> minMaxSelector = new MinMaxSelector<Long>(new LongComparator(true));
        Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<Long>(new LongComparator(true));
        List collected = dataSource.partitionByRange((KeySelector)keyExtractor).mapPartition(minMaxSelector).collect();
        Collections.sort(collected, tuple2Comparator);
        long previousMax = -1L;
        for (Tuple2 tuple2 : collected) {
            if (previousMax == -1L) {
                previousMax = (Long)tuple2.f1;
                continue;
            }
            long currentMin = (Long)tuple2.f0;
            Assert.assertTrue(((Long)tuple2.f0 < (Long)tuple2.f1 ? 1 : 0) != 0);
            Assert.assertEquals((long)(previousMax + 1L), (long)currentMin);
            previousMax = (Long)tuple2.f1;
        }
    }

    @Test(expected=InvalidProgramException.class)
    public void testRangePartitionInIteration() throws Exception {
        if (this.mode == MultipleProgramsTestBaseJUnit4.TestExecutionMode.COLLECTION) {
            throw new InvalidProgramException("Does not apply for collection execution");
        }
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource source = env.generateSequence(0L, 10000L);
        MapOperator tuples = source.map((MapFunction)new MapFunction<Long, Tuple2<Long, String>>(){

            public Tuple2<Long, String> map(Long v) throws Exception {
                return new Tuple2((Object)v, (Object)Long.toString(v));
            }
        });
        DeltaIteration it = tuples.iterateDelta((DataSet)tuples, 10, new int[]{0});
        JoinOperator.ProjectJoin body = it.getWorkset().partitionByRange(new int[]{1}).join((DataSet)it.getSolutionSet()).where(new int[]{0}).equalTo(new int[]{0}).projectFirst(new int[]{0}).projectSecond(new int[]{1});
        DataSet result = it.closeWith((DataSet)body, (DataSet)body);
        result.collect();
    }

    @Test
    public void testRangePartitionerOnSequenceDataWithOrders() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        MapOperator dataSet = env.generateSequence(0L, 10000L).map((MapFunction)new MapFunction<Long, Tuple2<Long, Long>>(){

            public Tuple2<Long, Long> map(Long value) throws Exception {
                return new Tuple2((Object)(value / 5000L), (Object)(value % 5000L));
            }
        });
        Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<Long>(new LongComparator(true), new LongComparator(false));
        MinMaxSelector<Long> minMaxSelector = new MinMaxSelector<Long>(tuple2Comparator);
        List collected = dataSet.partitionByRange(new int[]{0, 1}).withOrders(new Order[]{Order.ASCENDING, Order.DESCENDING}).mapPartition(minMaxSelector).collect();
        Collections.sort(collected, new Tuple2Comparator<Long>(tuple2Comparator));
        Tuple2 previousMax = null;
        for (Tuple2 tuple2 : collected) {
            Assert.assertTrue((String)"Min element in each partition should be smaller than max.", (tuple2Comparator.compare((Tuple2<Long, Long>)((Tuple2)tuple2.f0), (Tuple2<Long, Long>)((Tuple2)tuple2.f1)) <= 0 ? 1 : 0) != 0);
            if (previousMax == null) {
                previousMax = (Tuple2)tuple2.f1;
                continue;
            }
            Assert.assertTrue((String)"Partitions overlap. Previous max should be smaller than current min.", (tuple2Comparator.compare((Tuple2<Long, Long>)previousMax, (Tuple2<Long, Long>)((Tuple2)tuple2.f0)) < 0 ? 1 : 0) != 0);
            if (((Long)previousMax.f0).equals(((Tuple2)tuple2.f0).f0)) {
                Assert.assertEquals((String)"Ordering on the second field should be continous.", (long)((Long)previousMax.f1 - 1L), (long)((Long)((Tuple2)tuple2.f0).f1));
            }
            previousMax = (Tuple2)tuple2.f1;
        }
    }

    @Test
    public void testRangePartitionerOnSequenceNestedDataWithOrders() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        MapOperator dataSet = env.generateSequence(0L, 10000L).map((MapFunction)new MapFunction<Long, Tuple2<Tuple2<Long, Long>, Long>>(){

            public Tuple2<Tuple2<Long, Long>, Long> map(Long value) throws Exception {
                return new Tuple2((Object)new Tuple2((Object)(value / 5000L), (Object)(value % 5000L)), (Object)value);
            }
        });
        Tuple2Comparator<Long> tuple2Comparator = new Tuple2Comparator<Long>(new LongComparator(true), new LongComparator(true));
        MinMaxSelector<Long> minMaxSelector = new MinMaxSelector<Long>(tuple2Comparator);
        List collected = dataSet.partitionByRange(new int[]{0}).withOrders(new Order[]{Order.ASCENDING}).mapPartition((MapPartitionFunction)new MapPartitionFunction<Tuple2<Tuple2<Long, Long>, Long>, Tuple2<Long, Long>>(){

            public void mapPartition(Iterable<Tuple2<Tuple2<Long, Long>, Long>> values, Collector<Tuple2<Long, Long>> out) throws Exception {
                for (Tuple2<Tuple2<Long, Long>, Long> value : values) {
                    out.collect(value.f0);
                }
            }
        }).mapPartition(minMaxSelector).collect();
        Collections.sort(collected, new Tuple2Comparator<Long>(tuple2Comparator));
        Tuple2 previousMax = null;
        for (Tuple2 tuple2 : collected) {
            Assert.assertTrue((String)"Min element in each partition should be smaller than max.", (tuple2Comparator.compare((Tuple2<Long, Long>)((Tuple2)tuple2.f0), (Tuple2<Long, Long>)((Tuple2)tuple2.f1)) <= 0 ? 1 : 0) != 0);
            if (previousMax == null) {
                previousMax = (Tuple2)tuple2.f1;
                continue;
            }
            Assert.assertTrue((String)"Partitions overlap. Previous max should be smaller than current min.", (tuple2Comparator.compare((Tuple2<Long, Long>)previousMax, (Tuple2<Long, Long>)((Tuple2)tuple2.f0)) < 0 ? 1 : 0) != 0);
            if (((Long)previousMax.f0).equals(((Tuple2)tuple2.f0).f0)) {
                Assert.assertEquals((String)"Ordering on the second field should be continous.", (long)((Long)previousMax.f1 + 1L), (long)((Long)((Tuple2)tuple2.f0).f1));
            }
            previousMax = (Tuple2)tuple2.f1;
        }
    }

    @Test
    public void testRangePartitionerWithKeySelectorOnSequenceNestedDataWithOrders() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        MapOperator dataSet = env.generateSequence(0L, 10000L).map((MapFunction)new MapFunction<Long, Tuple2<ComparablePojo, Long>>(){

            public Tuple2<ComparablePojo, Long> map(Long value) throws Exception {
                return new Tuple2((Object)new ComparablePojo(value / 5000L, value % 5000L), (Object)value);
            }
        });
        List collected = dataSet.partitionByRange((KeySelector)new KeySelector<Tuple2<ComparablePojo, Long>, ComparablePojo>(){

            public ComparablePojo getKey(Tuple2<ComparablePojo, Long> value) throws Exception {
                return (ComparablePojo)value.f0;
            }
        }).withOrders(new Order[]{Order.ASCENDING}).mapPartition(new MinMaxSelector<Tuple2<ComparablePojo, Long>>(new ComparablePojoComparator())).mapPartition((MapPartitionFunction)new ExtractComparablePojo()).collect();
        Comparator<Tuple2<ComparablePojo, ComparablePojo>> pojoComparator = new Comparator<Tuple2<ComparablePojo, ComparablePojo>>(){

            @Override
            public int compare(Tuple2<ComparablePojo, ComparablePojo> o1, Tuple2<ComparablePojo, ComparablePojo> o2) {
                return ((ComparablePojo)o1.f0).compareTo((ComparablePojo)o2.f1);
            }
        };
        Collections.sort(collected, pojoComparator);
        ComparablePojo previousMax = null;
        for (Tuple2 element : collected) {
            Assert.assertTrue((String)"Min element in each partition should be smaller than max.", (((ComparablePojo)element.f0).compareTo((ComparablePojo)element.f1) <= 0 ? 1 : 0) != 0);
            if (previousMax == null) {
                previousMax = (ComparablePojo)element.f1;
                continue;
            }
            Assert.assertTrue((String)"Partitions overlap. Previous max should be smaller than current min.", (previousMax.compareTo((ComparablePojo)element.f0) < 0 ? 1 : 0) != 0);
            if (previousMax.first.equals(((ComparablePojo)element.f0).first)) {
                Assert.assertEquals((String)"Ordering on the second field should be continous.", (long)(previousMax.second - 1L), (long)((ComparablePojo)element.f0).second);
            }
            previousMax = (ComparablePojo)element.f1;
        }
    }

    private static class LongComparator
    implements Comparator<Long>,
    Serializable {
        private final boolean ascending;

        public LongComparator(boolean ascending) {
            this.ascending = ascending;
        }

        @Override
        public int compare(Long o1, Long o2) {
            if (this.ascending) {
                return Long.compare(o1, o2);
            }
            return -1 * Long.compare(o1, o2);
        }
    }

    private static class Tuple2Comparator<T>
    implements Comparator<Tuple2<T, T>>,
    Serializable {
        private final Comparator<T> firstComparator;
        private final Comparator<T> secondComparator;

        public Tuple2Comparator(Comparator<T> comparator) {
            this(comparator, comparator);
        }

        public Tuple2Comparator(Comparator<T> firstComparator, Comparator<T> secondComparator) {
            this.firstComparator = firstComparator;
            this.secondComparator = secondComparator;
        }

        @Override
        public int compare(Tuple2<T, T> first, Tuple2<T, T> second) {
            long result = this.firstComparator.compare(first.f0, second.f0);
            if (result > 0L) {
                return 1;
            }
            if (result < 0L) {
                return -1;
            }
            result = this.secondComparator.compare(first.f1, second.f1);
            if (result > 0L) {
                return 1;
            }
            if (result < 0L) {
                return -1;
            }
            return 0;
        }
    }

    private static class MinMaxSelector<T>
    implements MapPartitionFunction<T, Tuple2<T, T>> {
        private final Comparator<T> comparator;

        public MinMaxSelector(Comparator<T> comparator) {
            this.comparator = comparator;
        }

        public void mapPartition(Iterable<T> values, Collector<Tuple2<T, T>> out) throws Exception {
            T min;
            Iterator<T> itr = values.iterator();
            if (!itr.hasNext()) {
                return;
            }
            T max = min = itr.next();
            while (itr.hasNext()) {
                T value = itr.next();
                if (this.comparator.compare(value, min) < 0) {
                    min = value;
                }
                if (this.comparator.compare(value, max) <= 0) continue;
                max = value;
            }
            Tuple2 result = new Tuple2(min, max);
            out.collect((Object)result);
        }
    }

    private static class ObjectSelfKeySelector
    implements KeySelector<Long, Long> {
        private ObjectSelfKeySelector() {
        }

        public Long getKey(Long value) throws Exception {
            return value;
        }
    }

    public static class ComparablePojo
    implements Comparable<ComparablePojo> {
        private Long first;
        private Long second;

        public Long getFirst() {
            return this.first;
        }

        public void setFirst(Long first) {
            this.first = first;
        }

        public Long getSecond() {
            return this.second;
        }

        public void setSecond(Long second) {
            this.second = second;
        }

        public ComparablePojo(Long first, Long second) {
            this.first = first;
            this.second = second;
        }

        public ComparablePojo() {
        }

        @Override
        public int compareTo(ComparablePojo o) {
            int firstResult = Long.compare(this.first, o.first);
            if (firstResult == 0) {
                return -1 * Long.compare(this.second, o.second);
            }
            return firstResult;
        }
    }

    private static class ComparablePojoComparator
    implements Comparator<Tuple2<ComparablePojo, Long>>,
    Serializable {
        private ComparablePojoComparator() {
        }

        @Override
        public int compare(Tuple2<ComparablePojo, Long> o1, Tuple2<ComparablePojo, Long> o2) {
            return ((ComparablePojo)o1.f0).compareTo((ComparablePojo)o2.f0);
        }
    }

    private static class ExtractComparablePojo
    implements MapPartitionFunction<Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>, Tuple2<ComparablePojo, ComparablePojo>> {
        private ExtractComparablePojo() {
        }

        public void mapPartition(Iterable<Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>>> values, Collector<Tuple2<ComparablePojo, ComparablePojo>> out) throws Exception {
            for (Tuple2<Tuple2<ComparablePojo, Long>, Tuple2<ComparablePojo, Long>> value : values) {
                out.collect((Object)new Tuple2(((Tuple2)value.f0).f0, ((Tuple2)value.f1).f0));
            }
        }
    }

    private static class PartitionIndexMapper
    extends RichMapFunction<Long, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        private PartitionIndexMapper() {
        }

        public Tuple2<Integer, Integer> map(Long value) throws Exception {
            return new Tuple2((Object)this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), (Object)1);
        }
    }

    private static class UniqueNestedPojoLongMapper
    implements MapPartitionFunction<CollectionDataSets.POJO, Long> {
        private static final long serialVersionUID = 1L;

        private UniqueNestedPojoLongMapper() {
        }

        public void mapPartition(Iterable<CollectionDataSets.POJO> records, Collector<Long> out) throws Exception {
            HashSet<Long> uniq = new HashSet<Long>();
            for (CollectionDataSets.POJO t : records) {
                uniq.add(t.nestedPojo.longNumber);
            }
            for (Long l : uniq) {
                out.collect((Object)l);
            }
        }
    }

    private static class UniqueLongMapper
    implements MapPartitionFunction<Long, Long> {
        private static final long serialVersionUID = 1L;

        private UniqueLongMapper() {
        }

        public void mapPartition(Iterable<Long> longs, Collector<Long> out) throws Exception {
            HashSet<Long> uniq = new HashSet<Long>();
            for (Long l : longs) {
                uniq.add(l);
            }
            for (Long l : uniq) {
                out.collect((Object)l);
            }
        }
    }

    private static class UniqueTupleLongMapper
    implements MapPartitionFunction<Tuple3<Integer, Long, String>, Long> {
        private static final long serialVersionUID = 1L;

        private UniqueTupleLongMapper() {
        }

        public void mapPartition(Iterable<Tuple3<Integer, Long, String>> records, Collector<Long> out) throws Exception {
            HashSet<Object> uniq = new HashSet<Object>();
            for (Tuple3<Integer, Long, String> tuple3 : records) {
                uniq.add(tuple3.f1);
            }
            for (Long l : uniq) {
                out.collect((Object)l);
            }
        }
    }

    private static class Mapper1
    implements MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        private Mapper1() {
        }

        public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception {
            value.f1 = (Integer)value.f1 / 10;
            return value;
        }
    }

    private static class Reducer1
    implements ReduceFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1L;

        private Reducer1() {
        }

        public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> v1, Tuple2<Integer, Integer> v2) {
            return new Tuple2(v1.f0, (Object)((Integer)v1.f1 + (Integer)v2.f1));
        }
    }

    private static class Filter1
    implements FilterFunction<Long> {
        private static final long serialVersionUID = 1L;

        private Filter1() {
        }

        public boolean filter(Long value) throws Exception {
            return value > 780L;
        }
    }

    private static class KeySelector1
    implements KeySelector<Tuple3<Integer, Long, String>, Long> {
        private static final long serialVersionUID = 1L;

        private KeySelector1() {
        }

        public Long getKey(Tuple3<Integer, Long, String> value) throws Exception {
            return (Long)value.f1;
        }
    }

    private static class PrefixMapper
    implements MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>> {
        private PrefixMapper() {
        }

        public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value) throws Exception {
            if (((String)value.f2).length() > 5) {
                value.f2 = ((String)value.f2).substring(0, 5);
            }
            return value;
        }
    }
}

