/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.cancelling;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.RichJoinFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.test.cancelling.CancelingTestBase;
import org.apache.flink.test.cancelling.HeavyCompare;
import org.apache.flink.test.cancelling.HeavyCompareGeneratorInputFormat;
import org.apache.flink.test.util.InfiniteIntegerTupleInputFormat;
import org.apache.flink.test.util.UniformIntTupleGeneratorInputFormat;
import org.junit.Ignore;
import org.junit.Test;

@Ignore(value="Takes too long.")
public class JoinCancelingITCase
extends CancelingTestBase {
    private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow) throws Exception {
        this.executeTask(joiner, slow, 4);
    }

    private void executeTask(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, boolean slow, int parallelism) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource input1 = env.createInput((InputFormat)new InfiniteIntegerTupleInputFormat(slow));
        DataSource input2 = env.createInput((InputFormat)new InfiniteIntegerTupleInputFormat(slow));
        input1.join((DataSet)input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE).where(new int[]{0}).equalTo(new int[]{0}).with(joiner).output((OutputFormat)new DiscardingOutputFormat());
        env.setParallelism(parallelism);
        this.runAndCancelJob(env.createProgramPlan(), 5000, 10000);
    }

    @Test
    public void testCancelSortMatchWhileReadingSlowInputs() throws Exception {
        this.executeTask(new SimpleMatcher<Integer>(), true);
    }

    @Test
    public void testCancelSortMatchWhileReadingFastInputs() throws Exception {
        this.executeTask(new SimpleMatcher<Integer>(), false);
    }

    @Test
    public void testCancelSortMatchPriorToFirstRecordReading() throws Exception {
        this.executeTask((JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>)new StuckInOpenMatcher(), false);
    }

    private void executeTaskWithGenerator(JoinFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Tuple2<Integer, Integer>> joiner, int keys, int vals, int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSource input1 = env.createInput((InputFormat)new UniformIntTupleGeneratorInputFormat(keys, vals));
        DataSource input2 = env.createInput((InputFormat)new UniformIntTupleGeneratorInputFormat(keys, vals));
        input1.join((DataSet)input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE).where(new int[]{0}).equalTo(new int[]{0}).with(joiner).output((OutputFormat)new DiscardingOutputFormat());
        env.setParallelism(4);
        this.runAndCancelJob(env.createProgramPlan(), msecsTillCanceling, maxTimeTillCanceled);
    }

    @Test
    public void testCancelSortMatchWhileDoingHeavySorting() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        HeavyCompareGeneratorInputFormat input = new HeavyCompareGeneratorInputFormat(100);
        DataSource input1 = env.createInput((InputFormat)input);
        DataSource input2 = env.createInput((InputFormat)input);
        input1.join((DataSet)input2, JoinOperatorBase.JoinHint.REPARTITION_SORT_MERGE).where(new int[]{0}).equalTo(new int[]{0}).with((JoinFunction)new JoinFunction<Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>, Tuple2<HeavyCompare, Integer>>(){

            public Tuple2<HeavyCompare, Integer> join(Tuple2<HeavyCompare, Integer> first, Tuple2<HeavyCompare, Integer> second) throws Exception {
                throw new Exception("Job should be canceled in sort-merge phase, never run here ...");
            }
        }).output((OutputFormat)new DiscardingOutputFormat());
        this.runAndCancelJob(env.createProgramPlan(), 30000, 60000);
    }

    @Test
    public void testCancelSortMatchWhileJoining() throws Exception {
        this.executeTaskWithGenerator(new DelayingMatcher<Integer>(), 500, 3, 10000, 20000);
    }

    @Test
    public void testCancelSortMatchWithLongCancellingResponse() throws Exception {
        this.executeTaskWithGenerator(new LongCancelTimeMatcher<Integer>(), 500, 3, 10000, 10000);
    }

    @Test
    public void testCancelSortMatchWithHighparallelism() throws Exception {
        this.executeTask(new SimpleMatcher<Integer>(), false, 64);
    }

    private static final class StuckInOpenMatcher<IN>
    extends RichJoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
        private static final long serialVersionUID = 1L;

        private StuckInOpenMatcher() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void open(Configuration parameters) throws Exception {
            StuckInOpenMatcher stuckInOpenMatcher = this;
            synchronized (stuckInOpenMatcher) {
                ((Object)((Object)this)).wait();
            }
        }

        public Tuple2<IN, IN> join(Tuple2<IN, IN> first, Tuple2<IN, IN> second) throws Exception {
            return new Tuple2(first.f0, second.f0);
        }
    }

    private static final class LongCancelTimeMatcher<IN>
    implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
        private static final long serialVersionUID = 1L;
        private static final int WAIT_TIME_PER_RECORD = 5000;

        private LongCancelTimeMatcher() {
        }

        public Tuple2<IN, IN> join(Tuple2<IN, IN> first, Tuple2<IN, IN> second) throws Exception {
            long start = System.currentTimeMillis();
            long remaining = 5000L;
            do {
                try {
                    Thread.sleep(remaining);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            } while ((remaining = 5000L - System.currentTimeMillis() + start) > 0L);
            return new Tuple2(first.f0, second.f0);
        }
    }

    private static final class DelayingMatcher<IN>
    implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
        private static final long serialVersionUID = 1L;
        private static final int WAIT_TIME_PER_RECORD = 10000;

        private DelayingMatcher() {
        }

        public Tuple2<IN, IN> join(Tuple2<IN, IN> first, Tuple2<IN, IN> second) throws Exception {
            Thread.sleep(10000L);
            return new Tuple2(first.f0, second.f0);
        }
    }

    private static final class SimpleMatcher<IN>
    implements JoinFunction<Tuple2<IN, IN>, Tuple2<IN, IN>, Tuple2<IN, IN>> {
        private static final long serialVersionUID = 1L;

        private SimpleMatcher() {
        }

        public Tuple2<IN, IN> join(Tuple2<IN, IN> first, Tuple2<IN, IN> second) throws Exception {
            return new Tuple2(first.f0, second.f0);
        }
    }
}

