/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.Random;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.collect.EvictingQueue;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase;
import org.junit.Assert;

public class UdfStreamOperatorCheckpointingITCase
extends StreamFaultToleranceTestBase {
    private static final long NUM_INPUT = 500000L;
    private static final int NUM_OUTPUT = 1000;

    @Override
    public void testProgram(StreamExecutionEnvironment env) {
        KeyedStream stream = env.addSource((SourceFunction)new StatefulMultipleSequence()).keyBy(new int[]{0});
        stream.min(1).map((MapFunction)new OnceFailingIdentityMapFunction(500000L)).keyBy(new int[]{0}).addSink((SinkFunction)new MinEvictingQueueSink());
        stream.reduce((ReduceFunction)new ReduceFunction<Tuple2<Integer, Long>>(){

            public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception {
                return Tuple2.of((Object)value1.f0, (Object)((Long)value1.f1 + (Long)value2.f1));
            }
        }).keyBy(new int[]{0}).addSink((SinkFunction)new SumEvictingQueueSink());
        stream.fold((Object)Tuple2.of((Object)0, (Object)0L), (FoldFunction)new FoldFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>(){

            public Tuple2<Integer, Long> fold(Tuple2<Integer, Long> accumulator, Tuple2<Integer, Long> value) throws Exception {
                return Tuple2.of((Object)value.f0, (Object)((Long)accumulator.f1 + (Long)value.f1));
            }
        }).keyBy(new int[]{0}).addSink((SinkFunction)new FoldEvictingQueueSink());
    }

    @Override
    public void postSubmit() {
        Long value;
        long sum;
        int i;
        for (i = 0; i < 12; ++i) {
            for (Long value2 : MinEvictingQueueSink.queues[i]) {
                Assert.assertTrue((String)("Value different from 1 found, was " + value2 + "."), (value2 == 1L ? 1 : 0) != 0);
            }
        }
        for (i = 0; i < 12; ++i) {
            long prevCount = 499000L;
            sum = prevCount * (prevCount + 1L) / 2L;
            while (!SumEvictingQueueSink.queues[i].isEmpty()) {
                value = SumEvictingQueueSink.queues[i].remove();
                Assert.assertTrue((String)("Unexpected reduce value " + value + " instead of " + (sum += ++prevCount) + "."), (value == sum ? 1 : 0) != 0);
            }
        }
        for (i = 0; i < 12; ++i) {
            long prevCount = 499000L;
            sum = prevCount * (prevCount + 1L) / 2L;
            while (!FoldEvictingQueueSink.queues[i].isEmpty()) {
                value = FoldEvictingQueueSink.queues[i].remove();
                Assert.assertTrue((String)("Unexpected fold value " + value + " instead of " + (sum += ++prevCount) + "."), (value == sum ? 1 : 0) != 0);
            }
        }
    }

    private static class FoldEvictingQueueSink
    implements SinkFunction<Tuple2<Integer, Long>> {
        public static Queue<Long>[] queues = new Queue[12];

        private FoldEvictingQueueSink() {
        }

        public void invoke(Tuple2<Integer, Long> value) throws Exception {
            if (queues[(Integer)value.f0] == null) {
                FoldEvictingQueueSink.queues[((Integer)value.f0).intValue()] = EvictingQueue.create((int)1000);
            }
            queues[(Integer)value.f0].add((Long)value.f1);
        }
    }

    private static class SumEvictingQueueSink
    implements SinkFunction<Tuple2<Integer, Long>> {
        public static Queue<Long>[] queues = new Queue[12];

        private SumEvictingQueueSink() {
        }

        public void invoke(Tuple2<Integer, Long> value) throws Exception {
            if (queues[(Integer)value.f0] == null) {
                SumEvictingQueueSink.queues[((Integer)value.f0).intValue()] = EvictingQueue.create((int)1000);
            }
            queues[(Integer)value.f0].add((Long)value.f1);
        }
    }

    private static class MinEvictingQueueSink
    implements SinkFunction<Tuple2<Integer, Long>> {
        public static Queue<Long>[] queues = new Queue[12];

        private MinEvictingQueueSink() {
        }

        public void invoke(Tuple2<Integer, Long> value) throws Exception {
            if (queues[(Integer)value.f0] == null) {
                MinEvictingQueueSink.queues[((Integer)value.f0).intValue()] = EvictingQueue.create((int)1000);
            }
            queues[(Integer)value.f0].add((Long)value.f1);
        }
    }

    private static class OnceFailingIdentityMapFunction
    extends RichMapFunction<Tuple2<Integer, Long>, Tuple2<Integer, Long>>
    implements ListCheckpointed<Long> {
        private static volatile boolean hasFailed = false;
        private final long numElements;
        private long failurePos;
        private long count;

        public OnceFailingIdentityMapFunction(long numElements) {
            this.numElements = numElements;
        }

        public void open(Configuration parameters) throws Exception {
            long failurePosMin = (long)(0.4 * (double)this.numElements / (double)this.getRuntimeContext().getNumberOfParallelSubtasks());
            long failurePosMax = (long)(0.7 * (double)this.numElements / (double)this.getRuntimeContext().getNumberOfParallelSubtasks());
            this.failurePos = new Random().nextLong() % (failurePosMax - failurePosMin) + failurePosMin;
        }

        public Tuple2<Integer, Long> map(Tuple2<Integer, Long> value) throws Exception {
            if (!hasFailed && this.count >= this.failurePos) {
                hasFailed = true;
                throw new Exception("Test Failure");
            }
            ++this.count;
            return value;
        }

        public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.count);
        }

        public void restoreState(List<Long> state) throws Exception {
            if (!state.isEmpty()) {
                this.count = state.get(0);
            }
        }
    }

    private static class StatefulMultipleSequence
    extends RichSourceFunction<Tuple2<Integer, Long>>
    implements ListCheckpointed<Long> {
        private long count;

        private StatefulMultipleSequence() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
            Object lock = ctx.getCheckpointLock();
            while (this.count < 500000L) {
                Object object = lock;
                synchronized (object) {
                    for (int i = 0; i < 12; ++i) {
                        ctx.collect((Object)Tuple2.of((Object)i, (Object)(this.count + 1L)));
                    }
                    ++this.count;
                }
            }
        }

        public void cancel() {
        }

        public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.count);
        }

        public void restoreState(List<Long> state) throws Exception {
            if (state.isEmpty() || state.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
            }
            this.count = state.get(0);
        }
    }
}

