/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.state.operator.restore.unkeyed;

import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.test.state.operator.restore.ExecutionMode;
import org.junit.Assert;

public class NonKeyedJob {
    public static void main(String[] args) throws Exception {
        ParameterTool pt = ParameterTool.fromArgs((String[])args);
        String savepointsPath = pt.getRequired("savepoint-path");
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointsPath);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI((Configuration)config);
        env.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.setStateBackend((StateBackend)new MemoryStateBackend());
        SingleOutputStreamOperator<Integer> source = NonKeyedJob.createSource(env, ExecutionMode.GENERATE);
        SingleOutputStreamOperator<Integer> first = NonKeyedJob.createFirstStatefulMap(ExecutionMode.GENERATE, source);
        first.startNewChain();
        SingleOutputStreamOperator<Integer> second = NonKeyedJob.createSecondStatefulMap(ExecutionMode.GENERATE, first);
        second.startNewChain();
        SingleOutputStreamOperator<Integer> stateless = NonKeyedJob.createStatelessMap(second);
        SingleOutputStreamOperator<Integer> third = NonKeyedJob.createThirdStatefulMap(ExecutionMode.GENERATE, stateless);
        env.execute("job");
    }

    public static SingleOutputStreamOperator<Integer> createSource(StreamExecutionEnvironment env, ExecutionMode mode) {
        return env.addSource((SourceFunction)new IntegerSource(mode)).setParallelism(4);
    }

    public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
        return input.map((MapFunction)new StatefulStringStoringMap(mode, "first")).setParallelism(4).uid("first");
    }

    public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
        return input.map((MapFunction)new StatefulStringStoringMap(mode, "second")).setParallelism(4).uid("second");
    }

    public static SingleOutputStreamOperator<Integer> createThirdStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
        SingleOutputStreamOperator map = input.map((MapFunction)new StatefulStringStoringMap(mode, "third")).setParallelism(4).uid("third");
        return map;
    }

    public static SingleOutputStreamOperator<Integer> createStatelessMap(DataStream<Integer> input) {
        return input.map((MapFunction)new NoOpMapFunction()).setParallelism(4);
    }

    private static final class IntegerSource
    extends RichParallelSourceFunction<Integer> {
        private static final long serialVersionUID = 1912878510707871659L;
        private final ExecutionMode mode;
        private volatile boolean running = true;

        private IntegerSource(ExecutionMode mode) {
            this.mode = mode;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            ctx.collect((Object)1);
            switch (this.mode) {
                case GENERATE: 
                case MIGRATE: {
                    IntegerSource integerSource = this;
                    synchronized (integerSource) {
                        while (this.running) {
                            ((Object)((Object)this)).wait();
                        }
                        break;
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() {
            IntegerSource integerSource = this;
            synchronized (integerSource) {
                this.running = false;
                ((Object)((Object)this)).notifyAll();
            }
        }
    }

    private static class NoOpMapFunction
    implements MapFunction<Integer, Integer> {
        private static final long serialVersionUID = 6584823409744624276L;

        private NoOpMapFunction() {
        }

        public Integer map(Integer value) throws Exception {
            return value;
        }
    }

    private static class StatefulStringStoringMap
    extends RichMapFunction<Integer, Integer>
    implements ListCheckpointed<String> {
        private static final long serialVersionUID = 6092985758425330235L;
        private final ExecutionMode mode;
        private final String valueToStore;

        private StatefulStringStoringMap(ExecutionMode mode, String valueToStore) {
            this.mode = mode;
            this.valueToStore = valueToStore;
        }

        public Integer map(Integer value) throws Exception {
            return value;
        }

        public List<String> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Arrays.asList(this.valueToStore + this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
        }

        public void restoreState(List<String> state) throws Exception {
            switch (this.mode) {
                case GENERATE: {
                    break;
                }
                case MIGRATE: 
                case RESTORE: {
                    Assert.assertEquals((String)("Failed for " + this.valueToStore + this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()), (long)1L, (long)state.size());
                    String value = state.get(0);
                    Assert.assertEquals((Object)(this.valueToStore + this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()), (Object)value);
                }
            }
        }
    }
}

