/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.manual;

import java.io.Serializable;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.types.IntValue;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class OverwriteObjects {
    public static final Logger LOG = LoggerFactory.getLogger(OverwriteObjects.class);
    private static final int NUMBER_OF_ELEMENTS = 3000000;
    private static final int KEY_RANGE = 1000000;
    private static final int MAX_PARALLELISM = 4;
    private static final long RANDOM_SEED = new Random().nextLong();
    private static final Tuple2Comparator<IntValue, IntValue> comparator = new Tuple2Comparator();

    public static void main(String[] args) throws Exception {
        new OverwriteObjects().run();
    }

    public void run() throws Exception {
        LOG.info("Random seed = {}", (Object)RANDOM_SEED);
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        for (int parallelism = 4; parallelism > 0; --parallelism) {
            LOG.info("Parallelism = {}", (Object)parallelism);
            env.setParallelism(parallelism);
            this.testReduce(env);
            this.testGroupedReduce(env);
            this.testJoin(env);
            this.testCross(env);
        }
    }

    public void testReduce(ExecutionEnvironment env) throws Exception {
        LOG.info("Testing reduce");
        env.getConfig().enableObjectReuse();
        Tuple2 enabledResult = (Tuple2)this.getDataSet(env).reduce((ReduceFunction)new OverwriteObjectsReduce(false)).collect().get(0);
        env.getConfig().disableObjectReuse();
        Tuple2 disabledResult = (Tuple2)this.getDataSet(env).reduce((ReduceFunction)new OverwriteObjectsReduce(false)).collect().get(0);
        Assert.assertEquals((long)3000000L, (long)((IntValue)enabledResult.f1).getValue());
        Assert.assertEquals((long)3000000L, (long)((IntValue)disabledResult.f1).getValue());
        Assert.assertEquals((Object)disabledResult, (Object)enabledResult);
    }

    public void testGroupedReduce(ExecutionEnvironment env) throws Exception {
        LOG.info("Testing grouped reduce");
        env.getConfig().enableObjectReuse();
        List enabledResult = this.getDataSet(env).groupBy(new int[]{0}).reduce((ReduceFunction)new OverwriteObjectsReduce(true)).collect();
        Collections.sort(enabledResult, comparator);
        env.getConfig().disableObjectReuse();
        List disabledResult = this.getDataSet(env).groupBy(new int[]{0}).reduce((ReduceFunction)new OverwriteObjectsReduce(true)).collect();
        Collections.sort(disabledResult, comparator);
        Assert.assertThat((Object)disabledResult, (Matcher)Matchers.is((Object)enabledResult));
    }

    public void testJoin(ExecutionEnvironment env) throws Exception {
        for (JoinOperatorBase.JoinHint joinHint : JoinOperatorBase.JoinHint.values()) {
            if (joinHint == JoinOperatorBase.JoinHint.OPTIMIZER_CHOOSES) continue;
            LOG.info("Testing inner join with JoinHint = {}", (Object)joinHint);
            env.getConfig().enableObjectReuse();
            List enabledResult = this.getDataSet(env).join(this.getDataSet(env), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new OverwriteObjectsJoin()).collect();
            Collections.sort(enabledResult, comparator);
            env.getConfig().disableObjectReuse();
            List disabledResult = this.getDataSet(env).join(this.getDataSet(env), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new OverwriteObjectsJoin()).collect();
            Collections.sort(disabledResult, comparator);
            Assert.assertEquals((String)("JoinHint=" + joinHint), (Object)disabledResult, (Object)enabledResult);
            if (joinHint != JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST) {
                LOG.info("Testing left outer join with JoinHint = {}", (Object)joinHint);
                env.getConfig().enableObjectReuse();
                enabledResult = this.getDataSet(env).leftOuterJoin(this.getFilteredDataSet(env), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new OverwriteObjectsJoin()).collect();
                Collections.sort(enabledResult, comparator);
                env.getConfig().disableObjectReuse();
                disabledResult = this.getDataSet(env).leftOuterJoin(this.getFilteredDataSet(env), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new OverwriteObjectsJoin()).collect();
                Collections.sort(disabledResult, comparator);
                Assert.assertThat((String)("JoinHint=" + joinHint), (Object)disabledResult, (Matcher)Matchers.is((Object)enabledResult));
            }
            if (joinHint != JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND) {
                LOG.info("Testing right outer join with JoinHint = {}", (Object)joinHint);
                env.getConfig().enableObjectReuse();
                enabledResult = this.getDataSet(env).rightOuterJoin(this.getFilteredDataSet(env), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new OverwriteObjectsJoin()).collect();
                Collections.sort(enabledResult, comparator);
                env.getConfig().disableObjectReuse();
                disabledResult = this.getDataSet(env).rightOuterJoin(this.getFilteredDataSet(env), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new OverwriteObjectsJoin()).collect();
                Collections.sort(disabledResult, comparator);
                Assert.assertThat((String)("JoinHint=" + joinHint), (Object)disabledResult, (Matcher)Matchers.is((Object)enabledResult));
            }
            if (joinHint == JoinOperatorBase.JoinHint.BROADCAST_HASH_FIRST || joinHint == JoinOperatorBase.JoinHint.BROADCAST_HASH_SECOND) continue;
            LOG.info("Testing full outer join with JoinHint = {}", (Object)joinHint);
            env.getConfig().enableObjectReuse();
            enabledResult = this.getDataSet(env).fullOuterJoin(this.getFilteredDataSet(env), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new OverwriteObjectsJoin()).collect();
            Collections.sort(enabledResult, comparator);
            env.getConfig().disableObjectReuse();
            disabledResult = this.getDataSet(env).fullOuterJoin(this.getFilteredDataSet(env), joinHint).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new OverwriteObjectsJoin()).collect();
            Collections.sort(disabledResult, comparator);
            Assert.assertThat((String)("JoinHint=" + joinHint), (Object)disabledResult, (Matcher)Matchers.is((Object)enabledResult));
        }
    }

    public void testCross(ExecutionEnvironment env) throws Exception {
        LOG.info("Testing cross");
        DataSet<Tuple2<IntValue, IntValue>> small = this.getDataSet(env, 100, 20);
        DataSet<Tuple2<IntValue, IntValue>> large = this.getDataSet(env, 10000, 2000);
        env.getConfig().enableObjectReuse();
        List enabledResultWithHuge = small.crossWithHuge(large).with((CrossFunction)new OverwriteObjectsCross()).collect();
        List enabledResultWithTiny = small.crossWithTiny(large).with((CrossFunction)new OverwriteObjectsCross()).collect();
        Assert.assertThat((Object)enabledResultWithHuge, (Matcher)Matchers.is((Object)enabledResultWithTiny));
        env.getConfig().disableObjectReuse();
        List disabledResultWithHuge = small.crossWithHuge(large).with((CrossFunction)new OverwriteObjectsCross()).collect();
        List disabledResultWithTiny = small.crossWithTiny(large).with((CrossFunction)new OverwriteObjectsCross()).collect();
        Assert.assertThat((Object)disabledResultWithHuge, (Matcher)Matchers.is((Object)disabledResultWithTiny));
        Assert.assertThat((Object)disabledResultWithHuge, (Matcher)Matchers.is((Object)enabledResultWithHuge));
        Assert.assertThat((Object)disabledResultWithTiny, (Matcher)Matchers.is((Object)enabledResultWithTiny));
    }

    private DataSet<Tuple2<IntValue, IntValue>> getDataSet(ExecutionEnvironment env, int numberOfElements, int keyRange) {
        return env.fromCollection((Iterator)new TupleIntValueIntValueIterator(numberOfElements, keyRange), (TypeInformation)TupleTypeInfo.getBasicAndBasicValueTupleTypeInfo((Class[])new Class[]{IntValue.class, IntValue.class}));
    }

    private DataSet<Tuple2<IntValue, IntValue>> getDataSet(ExecutionEnvironment env) {
        return this.getDataSet(env, 3000000, 1000000);
    }

    private DataSet<Tuple2<IntValue, IntValue>> getFilteredDataSet(ExecutionEnvironment env) {
        return this.getDataSet(env).filter((FilterFunction)new FilterFunction<Tuple2<IntValue, IntValue>>(){

            public boolean filter(Tuple2<IntValue, IntValue> value) throws Exception {
                return ((IntValue)value.f0).getValue() % 2 == 0;
            }
        });
    }

    private static class Scrambler
    implements Serializable {
        private Tuple2<IntValue, IntValue> d = new Tuple2((Object)new IntValue(), (Object)new IntValue());
        private final boolean keyed;
        private Random random = new Random(OverwriteObjects.access$300() ^ 0xFFFFFFFFFFFFFFFFL);

        public Scrambler(boolean keyed) {
            this.keyed = keyed;
        }

        public Tuple2<IntValue, IntValue> scramble(Tuple2<IntValue, IntValue> a, Tuple2<IntValue, IntValue> b) {
            Tuple2 result;
            Random random = new Random(RANDOM_SEED);
            if (a != null && b != null) {
                random.setSeed(((long)((IntValue)a.f0).getValue() << 32) + (long)((IntValue)b.f0).getValue());
            } else if (a != null) {
                random.setSeed(((IntValue)a.f0).getValue());
            } else if (b != null) {
                random.setSeed(((IntValue)b.f0).getValue());
            } else {
                throw new RuntimeException("One of a or b should be not null");
            }
            switch (random.nextInt(4)) {
                case 0: {
                    result = a;
                    break;
                }
                case 1: {
                    result = b;
                    break;
                }
                case 2: {
                    result = this.d;
                    break;
                }
                case 3: {
                    result = new Tuple2((Object)new IntValue(), (Object)new IntValue());
                    break;
                }
                default: {
                    throw new RuntimeException("Unexpected value in switch statement");
                }
            }
            if (a == null || b == null) {
                if (result == null) {
                    result = this.d;
                }
                if (a == null) {
                    ((IntValue)b.f0).copyTo((IntValue)result.f0);
                    ((IntValue)b.f1).copyTo((IntValue)result.f1);
                } else {
                    ((IntValue)a.f0).copyTo((IntValue)result.f0);
                    ((IntValue)a.f1).copyTo((IntValue)result.f1);
                }
            } else {
                if (this.keyed) {
                    ((IntValue)result.f0).setValue(((IntValue)a.f0).getValue());
                } else {
                    ((IntValue)result.f0).setValue(((IntValue)a.f0).getValue() + ((IntValue)b.f0).getValue());
                }
                ((IntValue)result.f1).setValue(((IntValue)a.f1).getValue() + ((IntValue)b.f1).getValue());
            }
            this.scrambleIfNot((Tuple2<IntValue, IntValue>)a, result);
            this.scrambleIfNot(b, result);
            this.scrambleIfNot(this.d, result);
            return result;
        }

        private void scrambleIfNot(Tuple2<IntValue, IntValue> t, Object o) {
            if (t != null && t != o) {
                ((IntValue)t.f0).setValue(this.random.nextInt());
                ((IntValue)t.f1).setValue(this.random.nextInt());
            }
        }
    }

    private static class Tuple2Comparator<T0 extends Comparable<T0>, T1 extends Comparable<T1>>
    implements Comparator<Tuple2<T0, T1>> {
        private Tuple2Comparator() {
        }

        @Override
        public int compare(Tuple2<T0, T1> o1, Tuple2<T0, T1> o2) {
            int cmp = ((Comparable)o1.f0).compareTo(o2.f0);
            if (cmp != 0) {
                return cmp;
            }
            return ((Comparable)o1.f1).compareTo(o2.f1);
        }
    }

    private static class TupleIntValueIntValueIterator
    implements Iterator<Tuple2<IntValue, IntValue>>,
    Serializable {
        private int numElements;
        private final int keyRange;
        private Tuple2<IntValue, IntValue> ret = new Tuple2((Object)new IntValue(), (Object)new IntValue());
        private final Random rnd = new Random(123L);

        public TupleIntValueIntValueIterator(int numElements, int keyRange) {
            this.numElements = numElements;
            this.keyRange = keyRange;
        }

        @Override
        public boolean hasNext() {
            return this.numElements > 0;
        }

        @Override
        public Tuple2<IntValue, IntValue> next() {
            --this.numElements;
            ((IntValue)this.ret.f0).setValue(this.rnd.nextInt(this.keyRange));
            ((IntValue)this.ret.f1).setValue(1);
            return this.ret;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    private class OverwriteObjectsCross
    implements CrossFunction<Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>> {
        private Scrambler scrambler = new Scrambler(true);

        private OverwriteObjectsCross() {
        }

        public Tuple2<IntValue, IntValue> cross(Tuple2<IntValue, IntValue> a, Tuple2<IntValue, IntValue> b) throws Exception {
            return this.scrambler.scramble(a, b);
        }
    }

    private class OverwriteObjectsJoin
    implements JoinFunction<Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>, Tuple2<IntValue, IntValue>> {
        private Scrambler scrambler = new Scrambler(true);

        private OverwriteObjectsJoin() {
        }

        public Tuple2<IntValue, IntValue> join(Tuple2<IntValue, IntValue> a, Tuple2<IntValue, IntValue> b) throws Exception {
            return this.scrambler.scramble(a, b);
        }
    }

    private class OverwriteObjectsReduce
    implements ReduceFunction<Tuple2<IntValue, IntValue>> {
        private Scrambler scrambler;

        public OverwriteObjectsReduce(boolean keyed) {
            this.scrambler = new Scrambler(keyed);
        }

        public Tuple2<IntValue, IntValue> reduce(Tuple2<IntValue, IntValue> a, Tuple2<IntValue, IntValue> b) throws Exception {
            return this.scrambler.scramble(a, b);
        }
    }
}

