/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.state.operator.restore.keyed;

import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
import org.apache.flink.test.state.operator.restore.ExecutionMode;
import org.apache.flink.util.Collector;
import org.junit.Assert;

public class KeyedJob {
    public static void main(String[] args) throws Exception {
        ParameterTool pt = ParameterTool.fromArgs((String[])args);
        String savepointsPath = pt.getRequired("savepoint-path");
        Configuration config = new Configuration();
        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointsPath);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI((Configuration)config);
        env.enableCheckpointing(500L, CheckpointingMode.EXACTLY_ONCE);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.setStateBackend((StateBackend)new MemoryStateBackend());
        SingleOutputStreamOperator<Tuple2<Integer, Integer>> source = KeyedJob.createIntegerTupleSource(env, ExecutionMode.GENERATE);
        SingleOutputStreamOperator<Integer> window = KeyedJob.createWindowFunction(ExecutionMode.GENERATE, source);
        SingleOutputStreamOperator<Integer> first = KeyedJob.createFirstStatefulMap(ExecutionMode.GENERATE, window);
        SingleOutputStreamOperator<Integer> second = KeyedJob.createSecondStatefulMap(ExecutionMode.GENERATE, first);
        env.execute("job");
    }

    public static SingleOutputStreamOperator<Tuple2<Integer, Integer>> createIntegerTupleSource(StreamExecutionEnvironment env, ExecutionMode mode) {
        return env.addSource((SourceFunction)new IntegerTupleSource(mode));
    }

    public static SingleOutputStreamOperator<Integer> createWindowFunction(ExecutionMode mode, DataStream<Tuple2<Integer, Integer>> input) {
        return input.keyBy(new int[]{0}).countWindow(1L).apply((WindowFunction)new StatefulWindowFunction(mode)).setParallelism(4).uid("window");
    }

    public static SingleOutputStreamOperator<Integer> createFirstStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
        SingleOutputStreamOperator map = input.map((MapFunction)new StatefulStringStoringMap(mode, "first")).setParallelism(4).uid("first");
        return map;
    }

    public static SingleOutputStreamOperator<Integer> createSecondStatefulMap(ExecutionMode mode, DataStream<Integer> input) {
        SingleOutputStreamOperator map = input.map((MapFunction)new StatefulStringStoringMap(mode, "second")).setParallelism(4).uid("second");
        return map;
    }

    private static class StatefulStringStoringMap
    extends RichMapFunction<Integer, Integer>
    implements ListCheckpointed<String> {
        private static final long serialVersionUID = 6092985758425330235L;
        private final ExecutionMode mode;
        private final String valueToStore;

        private StatefulStringStoringMap(ExecutionMode mode, String valueToStore) {
            this.mode = mode;
            this.valueToStore = valueToStore;
        }

        public Integer map(Integer value) throws Exception {
            return value;
        }

        public List<String> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Arrays.asList(this.valueToStore + this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
        }

        public void restoreState(List<String> state) throws Exception {
            switch (this.mode) {
                case GENERATE: {
                    break;
                }
                case MIGRATE: 
                case RESTORE: {
                    Assert.assertEquals((String)("Failed for " + this.valueToStore + this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()), (long)1L, (long)state.size());
                    String value = state.get(0);
                    Assert.assertEquals((Object)(this.valueToStore + this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask()), (Object)value);
                }
            }
        }
    }

    private static final class StatefulWindowFunction
    extends RichWindowFunction<Tuple2<Integer, Integer>, Integer, Tuple, GlobalWindow> {
        private static final long serialVersionUID = -7236313076792964055L;
        private final ExecutionMode mode;
        private transient ListState<Integer> state;
        private boolean applyCalled = false;

        private StatefulWindowFunction(ExecutionMode mode) {
            this.mode = mode;
        }

        public void open(OpenContext openContext) {
            this.state = this.getRuntimeContext().getListState(new ListStateDescriptor("values", Integer.class));
        }

        public void apply(Tuple key, GlobalWindow window, Iterable<Tuple2<Integer, Integer>> values, Collector<Integer> out) throws Exception {
            this.applyCalled = true;
            switch (this.mode) {
                case GENERATE: {
                    for (Tuple2<Integer, Integer> value : values) {
                        this.state.add(value.f1);
                    }
                    break;
                }
                case MIGRATE: 
                case RESTORE: {
                    Iterator<Tuple2<Integer, Integer>> input = values.iterator();
                    Iterator restored = ((Iterable)this.state.get()).iterator();
                    while (input.hasNext() && restored.hasNext()) {
                        Tuple2<Integer, Integer> value = input.next();
                        Integer rValue = (Integer)restored.next();
                        Assert.assertEquals((Object)rValue, (Object)value.f1);
                    }
                    Assert.assertEquals((Object)restored.hasNext(), (Object)input.hasNext());
                }
            }
        }

        public void close() {
            Assert.assertTrue((String)"Apply was never called.", (boolean)this.applyCalled);
        }
    }

    private static final class IntegerTupleSource
    extends RichSourceFunction<Tuple2<Integer, Integer>> {
        private static final long serialVersionUID = 1912878510707871659L;
        private final ExecutionMode mode;
        private boolean running = true;

        private IntegerTupleSource(ExecutionMode mode) {
            this.mode = mode;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Tuple2<Integer, Integer>> ctx) throws Exception {
            for (int x = 0; x < 10; ++x) {
                ctx.collect((Object)new Tuple2((Object)x, (Object)x));
            }
            switch (this.mode) {
                case GENERATE: 
                case MIGRATE: {
                    IntegerTupleSource integerTupleSource = this;
                    synchronized (integerTupleSource) {
                        while (this.running) {
                            ((Object)((Object)this)).wait();
                        }
                        break;
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void cancel() {
            IntegerTupleSource integerTupleSource = this;
            synchronized (integerTupleSource) {
                this.running = false;
                ((Object)((Object)this)).notifyAll();
            }
        }
    }
}

