/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.util.AbstractTestBaseJUnit4;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamCheckpointNotifierITCase
extends AbstractTestBaseJUnit4 {
    private static final Logger LOG = LoggerFactory.getLogger(StreamCheckpointNotifierITCase.class);
    private static final int PARALLELISM = 4;

    @Test
    public void testProgram() {
        try {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            Assert.assertEquals((String)"test setup broken", (long)4L, (long)env.getParallelism());
            env.enableCheckpointing(500L);
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)Integer.MAX_VALUE, (long)0L));
            int numElements = 10000;
            int numTaskTotal = 20;
            DataStreamSource stream = env.addSource((SourceFunction)new GeneratingSourceFunction(10000L, 20));
            stream.filter((FilterFunction)new LongRichFilterFunction()).connect((DataStream)stream).flatMap((CoFlatMapFunction)new LeftIdentityCoRichFlatMapFunction()).map((MapFunction)new IdentityMapFunction()).startNewChain().keyBy(new int[]{0}).reduce((ReduceFunction)new OnceFailingReducer(10000L)).sinkTo((Sink)new DiscardingSink());
            env.execute();
            long failureCheckpointID = OnceFailingReducer.failureCheckpointID;
            Assert.assertNotEquals((long)0L, (long)failureCheckpointID);
            List allLists = Arrays.asList(GeneratingSourceFunction.COMPLETED_CHECKPOINTS, LongRichFilterFunction.COMPLETED_CHECKPOINTS, LeftIdentityCoRichFlatMapFunction.COMPLETED_CHECKPOINTS, IdentityMapFunction.COMPLETED_CHECKPOINTS, OnceFailingReducer.COMPLETED_CHECKPOINTS);
            Iterator iterator = allLists.iterator();
            while (iterator.hasNext()) {
                List[] parallelNotifications;
                for (List notifications : parallelNotifications = (List[])iterator.next()) {
                    Assert.assertTrue((String)"No checkpoint notification was received.", (notifications.size() > 0 ? 1 : 0) != 0);
                    Assert.assertFalse((String)"Failure checkpoint was marked as completed.", (boolean)notifications.contains(failureCheckpointID));
                    Assert.assertFalse((String)"No checkpoint received after failure.", ((Long)notifications.get(notifications.size() - 1) == failureCheckpointID ? 1 : 0) != 0);
                    Assert.assertTrue((String)"Checkpoint notification was received multiple times", (notifications.size() == new HashSet(notifications).size() ? 1 : 0) != 0);
                }
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    static List<Long>[] createCheckpointLists(int parallelism) {
        List[] lists = new List[parallelism];
        for (int i = 0; i < parallelism; ++i) {
            lists[i] = new ArrayList();
        }
        return lists;
    }

    private static class OnceFailingReducer
    extends RichReduceFunction<Tuple1<Long>>
    implements ListCheckpointed<Long>,
    CheckpointListener {
        static volatile boolean hasFailed = false;
        static volatile long failureCheckpointID;
        static final List<Long>[] COMPLETED_CHECKPOINTS;
        private final long failurePos;
        private volatile long count;
        private volatile boolean notificationAlready;

        OnceFailingReducer(long numElements) {
            this.failurePos = (long)(0.5 * (double)numElements / 4.0);
        }

        public Tuple1<Long> reduce(Tuple1<Long> value1, Tuple1<Long> value2) {
            ++this.count;
            if (this.count >= this.failurePos && this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0) {
                LOG.info(">>>>>>>>>>>>>>>>> Reached failing position <<<<<<<<<<<<<<<<<<<<<");
            }
            Tuple1<Long> tuple1 = value1;
            tuple1.f0 = (Long)tuple1.f0 + (Long)value2.f0;
            return value1;
        }

        public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
            if (!hasFailed && this.count >= this.failurePos && this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask() == 0) {
                LOG.info(">>>>>>>>>>>>>>>>> Throwing Exception <<<<<<<<<<<<<<<<<<<<<");
                hasFailed = true;
                failureCheckpointID = checkpointId;
                throw new Exception("Test Failure");
            }
            return Collections.singletonList(this.count);
        }

        public void restoreState(List<Long> state) throws Exception {
            if (state.isEmpty() || state.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
            }
            this.count = state.get(0);
        }

        public void notifyCheckpointComplete(long checkpointId) {
            int partition = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            COMPLETED_CHECKPOINTS[partition].add(checkpointId);
            if (hasFailed && !this.notificationAlready) {
                this.notificationAlready = true;
                GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
            }
        }

        public void notifyCheckpointAborted(long checkpointId) {
        }

        static {
            COMPLETED_CHECKPOINTS = StreamCheckpointNotifierITCase.createCheckpointLists(4);
        }
    }

    private static class LeftIdentityCoRichFlatMapFunction
    extends RichCoFlatMapFunction<Long, Long, Long>
    implements CheckpointListener {
        static final List<Long>[] COMPLETED_CHECKPOINTS = StreamCheckpointNotifierITCase.createCheckpointLists(4);
        private volatile boolean notificationAlready;

        private LeftIdentityCoRichFlatMapFunction() {
        }

        public void flatMap1(Long value, Collector<Long> out) {
            out.collect((Object)value);
        }

        public void flatMap2(Long value, Collector<Long> out) {
        }

        public void notifyCheckpointComplete(long checkpointId) {
            int partition = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            COMPLETED_CHECKPOINTS[partition].add(checkpointId);
            if (OnceFailingReducer.hasFailed && !this.notificationAlready) {
                this.notificationAlready = true;
                GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
            }
        }

        public void notifyCheckpointAborted(long checkpointId) {
        }
    }

    private static class LongRichFilterFunction
    extends RichFilterFunction<Long>
    implements CheckpointListener {
        static final List<Long>[] COMPLETED_CHECKPOINTS = StreamCheckpointNotifierITCase.createCheckpointLists(4);
        private volatile boolean notificationAlready;

        private LongRichFilterFunction() {
        }

        public boolean filter(Long value) {
            return value < 100L;
        }

        public void notifyCheckpointComplete(long checkpointId) {
            int partition = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            COMPLETED_CHECKPOINTS[partition].add(checkpointId);
            if (OnceFailingReducer.hasFailed && !this.notificationAlready) {
                this.notificationAlready = true;
                GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
            }
        }

        public void notifyCheckpointAborted(long checkpointId) {
        }
    }

    private static class IdentityMapFunction
    extends RichMapFunction<Long, Tuple1<Long>>
    implements CheckpointListener {
        static final List<Long>[] COMPLETED_CHECKPOINTS = StreamCheckpointNotifierITCase.createCheckpointLists(4);
        private volatile boolean notificationAlready;

        private IdentityMapFunction() {
        }

        public Tuple1<Long> map(Long value) throws Exception {
            return Tuple1.of((Object)value);
        }

        public void notifyCheckpointComplete(long checkpointId) {
            int partition = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            COMPLETED_CHECKPOINTS[partition].add(checkpointId);
            if (OnceFailingReducer.hasFailed && !this.notificationAlready) {
                this.notificationAlready = true;
                GeneratingSourceFunction.numPostFailureNotifications.incrementAndGet();
            }
        }

        public void notifyCheckpointAborted(long checkpointId) {
        }
    }

    private static class GeneratingSourceFunction
    extends RichSourceFunction<Long>
    implements ParallelSourceFunction<Long>,
    CheckpointListener,
    ListCheckpointed<Integer> {
        static final List<Long>[] COMPLETED_CHECKPOINTS = StreamCheckpointNotifierITCase.createCheckpointLists(4);
        static AtomicLong numPostFailureNotifications = new AtomicLong();
        private final long numElements;
        private final int notificationsToWaitFor;
        private int index;
        private int step;
        private volatile boolean notificationAlready;
        private volatile boolean isRunning = true;

        GeneratingSourceFunction(long numElements, int notificationsToWaitFor) {
            this.numElements = numElements;
            this.notificationsToWaitFor = notificationsToWaitFor;
        }

        public void open(OpenContext openContext) throws IOException {
            this.step = this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
            if (this.index == 0) {
                this.index = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            Object lockingObject = ctx.getCheckpointLock();
            while (this.isRunning && (long)this.index < this.numElements) {
                long result = this.index % 10;
                Object object = lockingObject;
                synchronized (object) {
                    this.index += this.step;
                    ctx.collect((Object)result);
                }
            }
            while (this.isRunning && numPostFailureNotifications.get() < (long)this.notificationsToWaitFor) {
                Thread.sleep(50L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.index);
        }

        public void restoreState(List<Integer> state) throws Exception {
            if (state.isEmpty() || state.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
            }
            this.index = state.get(0);
        }

        public void notifyCheckpointComplete(long checkpointId) {
            int partition = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            COMPLETED_CHECKPOINTS[partition].add(checkpointId);
            if (OnceFailingReducer.hasFailed && !this.notificationAlready) {
                this.notificationAlready = true;
                numPostFailureNotifications.incrementAndGet();
            }
        }

        public void notifyCheckpointAborted(long checkpointId) {
        }
    }
}

