/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.scheduling;

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.SupportsConcurrentExecutionAttempts;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.io.FinalizeOnMaster;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.CommitterInitContext;
import org.apache.flink.api.connector.sink2.CommittingSinkWriter;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.SupportsCommitter;
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceReader;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceSplit;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.BatchExecutionOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.configuration.SlowTaskDetectorOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.util.InstantiationUtil;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class SpeculativeExecutionITCase {
    @TempDir
    private Path temporaryFolder;
    private static final int MAX_PARALLELISM = 4;
    private static final int NUMBERS_TO_PRODUCE = 10000;
    private static final int FAILURE_COUNT = 20;
    private static final AtomicInteger slowTaskCounter = new AtomicInteger(1);
    private int parallelism;
    private static ConcurrentMap<Integer, Map<Long, Long>> numberCountResults;
    private Map<Long, Long> expectedResult;

    SpeculativeExecutionITCase() {
    }

    @BeforeEach
    void setUp() {
        this.parallelism = 4;
        slowTaskCounter.set(1);
        this.expectedResult = LongStream.range(0L, 10000L).boxed().collect(Collectors.toMap(Function.identity(), i -> 1L));
        NumberCounterMap.toFailCounter.set(0);
        numberCountResults = new ConcurrentHashMap<Integer, Map<Long, Long>>();
    }

    @Test
    void testSpeculativeExecution() throws Exception {
        this.executeJob(this::setupJobWithSlowMap);
        this.waitUntilJobArchived();
        this.checkResults();
    }

    @Test
    void testSpeculativeExecutionWithFailover() throws Exception {
        NumberCounterMap.toFailCounter.set(20);
        this.executeJob(this::setupJobWithSlowMap);
        this.waitUntilJobArchived();
        this.checkResults();
    }

    @Test
    void testSpeculativeExecutionWithAdaptiveParallelism() throws Exception {
        this.parallelism = -1;
        this.executeJob(this::setupJobWithSlowMap);
        this.waitUntilJobArchived();
        this.checkResults();
    }

    @Test
    void testBlockSlowNodeInSpeculativeExecution() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(BatchExecutionOptions.BLOCK_SLOW_NODE_DURATION, (Object)Duration.ofMinutes(1L));
        JobClient client = this.executeJobAsync(configuration, this::setupJobWithSlowMap);
        Assertions.assertThatThrownBy(() -> {
            JobExecutionResult cfr_ignored_0 = (JobExecutionResult)client.getJobExecutionResult().get(30L, TimeUnit.SECONDS);
        }, (String)"The local node is expected to be blocked but it is not.", (Object[])new Object[0]).isInstanceOf(TimeoutException.class);
    }

    @Test
    void testSpeculativeExecutionOfSourceFunction() throws Exception {
        this.executeJob(this::setupJobWithSlowSourceFunction);
        this.waitUntilJobArchived();
        this.checkResults();
    }

    @Test
    void testSpeculativeExecutionOfInputFormatSource() throws Exception {
        this.executeJob(this::setupJobWithSlowInputFormatSource);
        this.waitUntilJobArchived();
        this.checkResults();
    }

    @Test
    void testSpeculativeExecutionOfNewSource() throws Exception {
        this.executeJob(this::setupJobWithSlowNewSource);
        this.waitUntilJobArchived();
        this.checkResults();
    }

    @Test
    void testSpeculativeExecutionOfNewSourceWithFailure() throws Exception {
        this.executeJob(env -> this.setupJobWithSlowNewSource((StreamExecutionEnvironment)env, true));
        this.waitUntilJobArchived();
        this.checkResults();
    }

    @Test
    public void testSpeculativeSlowSink() throws Exception {
        this.executeJob(this::setupSpeculativeSlowSink);
        this.waitUntilJobArchived();
        this.checkResults();
        Assertions.assertThat((int)DummyCommitter.attempts.get()).isEqualTo(this.parallelism);
        Assertions.assertThat((boolean)DummyCommitter.foundSpeculativeWriter).isTrue();
    }

    @Deprecated
    @Test
    public void testSpeculativeSlowSinkDeprecated() throws Exception {
        this.executeJob(this::setupSpeculativeSlowSinkDeprecated);
        this.waitUntilJobArchived();
        this.checkResults();
        Assertions.assertThat((int)DummyCommitter.attempts.get()).isEqualTo(this.parallelism);
        Assertions.assertThat((boolean)DummyCommitter.foundSpeculativeWriter).isTrue();
    }

    @Test
    public void testNonSpeculativeSlowSinkFunction() throws Exception {
        this.executeJob(this::setupNonSpeculativeSlowSinkFunction);
        this.waitUntilJobArchived();
        this.checkResults();
    }

    @Test
    public void testSpeculativeSlowSinkFunction() throws Exception {
        this.executeJob(this::setupSpeculativeSlowSinkFunction);
        this.waitUntilJobArchived();
        this.checkResults();
    }

    @Test
    public void testSpeculativeOutputFormatSink() throws Exception {
        this.executeJob(this::setupSlowOutputFormatSink);
        this.waitUntilJobArchived();
        this.checkResults();
        Assertions.assertThat((boolean)DummySpeculativeOutputFormat.foundSpeculativeAttempt).isTrue();
    }

    private void checkResults() {
        Map<Long, Long> numberCountResultMap = numberCountResults.values().stream().flatMap(map -> map.entrySet().stream()).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, Long::sum));
        Assertions.assertThat(numberCountResultMap).isEqualTo(this.expectedResult);
    }

    private void executeJob(Consumer<StreamExecutionEnvironment> jobSetupFunc) throws Exception {
        JobClient client = this.executeJobAsync(new Configuration(), jobSetupFunc);
        client.getJobExecutionResult().get();
    }

    private JobClient executeJobAsync(Configuration configuration, Consumer<StreamExecutionEnvironment> jobSetupFunc) throws Exception {
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((Configuration)this.configure(configuration));
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.setParallelism(-1);
        jobSetupFunc.accept((StreamExecutionEnvironment)env);
        return env.executeAsync();
    }

    private Configuration configure(Configuration configuration) {
        configuration.set(RestOptions.BIND_PORT, (Object)"0");
        configuration.set(JobManagerOptions.ARCHIVE_DIR, (Object)this.temporaryFolder.getRoot().toString());
        configuration.set(JobManagerOptions.SLOT_REQUEST_TIMEOUT, (Object)Duration.ofMillis(5000L));
        configuration.set(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE, (Object)1);
        configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, (Object)MemorySize.parse((String)"4kb"));
        configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, (Object)4);
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY, (Object)RestartStrategyOptions.RestartStrategyType.FIXED_DELAY.getMainValue());
        configuration.set(RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_ATTEMPTS, (Object)Integer.MAX_VALUE);
        configuration.set(BatchExecutionOptions.SPECULATIVE_ENABLED, (Object)true);
        if (!configuration.contains(BatchExecutionOptions.BLOCK_SLOW_NODE_DURATION)) {
            configuration.set(BatchExecutionOptions.BLOCK_SLOW_NODE_DURATION, (Object)Duration.ZERO);
        }
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_MULTIPLIER, (Object)1.0);
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_RATIO, (Object)0.2);
        configuration.set(SlowTaskDetectorOptions.EXECUTION_TIME_BASELINE_LOWER_BOUND, (Object)Duration.ofMillis(0L));
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, (Object)4);
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MIN_PARALLELISM, (Object)1);
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_MAX_PARALLELISM, (Object)4);
        configuration.set(BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_AVG_DATA_VOLUME_PER_TASK, (Object)MemorySize.parse((String)"150kb"));
        return configuration;
    }

    private void waitUntilJobArchived() throws InterruptedException {
        while (this.temporaryFolder.getRoot().toFile().listFiles().length < 1) {
            Thread.sleep(1000L);
        }
    }

    private void setupJobWithSlowMap(StreamExecutionEnvironment env) {
        SingleOutputStreamOperator source = env.fromSequence(0L, 9999L).setParallelism(this.parallelism).name("source").slotSharingGroup("sourceGroup");
        SingleOutputStreamOperator map = source.rebalance().map((MapFunction)new NumberCounterMap()).setParallelism(this.parallelism).name("map").slotSharingGroup("mapGroup");
        this.addSink((DataStream<Long>)map);
    }

    private void setupJobWithSlowSourceFunction(StreamExecutionEnvironment env) {
        SingleOutputStreamOperator source = new DataStreamSource(env, (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, new StreamSource((SourceFunction)new TestingSourceFunc()), true, "source", Boundedness.BOUNDED).setParallelism(this.parallelism).slotSharingGroup("sourceGroup");
        this.addSink((DataStream<Long>)source);
    }

    private void setupJobWithSlowInputFormatSource(StreamExecutionEnvironment env) {
        SingleOutputStreamOperator source = new DataStreamSource(env, (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, new StreamSource((SourceFunction)new InputFormatSourceFunction((InputFormat)new TestingInputFormat(), TypeInformation.of(Long.class))), true, "source", Boundedness.BOUNDED).setParallelism(this.parallelism).slotSharingGroup("sourceGroup");
        this.addSink((DataStream<Long>)source);
    }

    private void setupJobWithSlowNewSource(StreamExecutionEnvironment env) {
        this.setupJobWithSlowNewSource(env, false);
    }

    private void setupJobWithSlowNewSource(StreamExecutionEnvironment env, boolean forceFailureFlag) {
        DataStreamSource source = env.fromSource((Source)new TestingNumberSequenceSource(forceFailureFlag), WatermarkStrategy.noWatermarks(), "source");
        this.addSink((DataStream<Long>)source);
    }

    private void setupSpeculativeSlowSink(StreamExecutionEnvironment env) {
        DummyCommitter.attempts.set(0);
        DummyCommitter.blocked.set(false);
        DummyCommitter.foundSpeculativeWriter = false;
        SingleOutputStreamOperator source = env.fromSequence(0L, 9999L).setParallelism(this.parallelism).name("source").slotSharingGroup("sourceGroup");
        source.sinkTo((Sink)new SpeculativeSink()).setParallelism(this.parallelism).name("sink").slotSharingGroup("sinkGroup");
    }

    @Deprecated
    private void setupSpeculativeSlowSinkDeprecated(StreamExecutionEnvironment env) {
        DummyCommitter.attempts.set(0);
        DummyCommitter.blocked.set(false);
        DummyCommitter.foundSpeculativeWriter = false;
        SingleOutputStreamOperator source = env.fromSequence(0L, 9999L).setParallelism(this.parallelism).name("source").slotSharingGroup("sourceGroup");
        source.sinkTo((Sink)new SpeculativeSinkDeprecated()).setParallelism(this.parallelism).name("sink").slotSharingGroup("sinkGroup");
    }

    private void setupNonSpeculativeSlowSinkFunction(StreamExecutionEnvironment env) {
        SingleOutputStreamOperator source = env.fromSequence(0L, 9999L).setParallelism(this.parallelism).name("source").slotSharingGroup("sourceGroup");
        source.addSink((SinkFunction)new NonSpeculativeSinkFunction()).setParallelism(this.parallelism).name("sink").slotSharingGroup("sinkGroup");
    }

    private void setupSpeculativeSlowSinkFunction(StreamExecutionEnvironment env) {
        SingleOutputStreamOperator source = env.fromSequence(0L, 9999L).setParallelism(this.parallelism).name("source").slotSharingGroup("sourceGroup");
        source.addSink((SinkFunction)new SpeculativeSinkFunction()).setParallelism(this.parallelism).name("sink").slotSharingGroup("sinkGroup");
    }

    private void setupSlowOutputFormatSink(StreamExecutionEnvironment env) {
        SingleOutputStreamOperator source = env.fromSequence(0L, 9999L).setParallelism(this.parallelism).name("source").slotSharingGroup("group1");
        source.rebalance().writeUsingOutputFormat((OutputFormat)new DummySpeculativeOutputFormat()).setParallelism(this.parallelism).name("sink").slotSharingGroup("group3");
    }

    private void addSink(DataStream<Long> dataStream) {
        dataStream.rebalance().addSink((SinkFunction)new NumberCounterSink()).setParallelism(this.parallelism).name("sink").slotSharingGroup("sinkGroup");
    }

    private static void maybeSleep() {
        if (slowTaskCounter.getAndDecrement() > 0) {
            try {
                Thread.sleep(Integer.MAX_VALUE);
            }
            catch (Exception e) {
                throw new RuntimeException();
            }
        }
    }

    private static class DummySpeculativeOutputFormat
    implements OutputFormat<Long>,
    FinalizeOnMaster,
    SupportsConcurrentExecutionAttempts {
        private static final long serialVersionUID = 1L;
        private static volatile boolean foundSpeculativeAttempt;
        private int taskNumber;
        private boolean taskFailed;
        private final Map<Long, Long> numberCountResult = new HashMap<Long, Long>();

        private DummySpeculativeOutputFormat() {
        }

        public void configure(Configuration parameters) {
        }

        public void open(OutputFormat.InitializationContext context) throws IOException {
            this.taskNumber = context.getTaskNumber();
        }

        public void writeRecord(Long value) throws IOException {
            try {
                this.numberCountResult.merge(value, 1L, Long::sum);
                if (this.taskNumber == 0) {
                    SpeculativeExecutionITCase.maybeSleep();
                }
            }
            catch (Throwable t) {
                this.taskFailed = true;
            }
        }

        public void close() throws IOException {
            if (!this.taskFailed) {
                numberCountResults.put(this.taskNumber, this.numberCountResult);
            }
        }

        public void finalizeGlobal(FinalizeOnMaster.FinalizationContext context) throws IOException {
            for (int i = 0; i < context.getParallelism(); ++i) {
                if (context.getFinishedAttempt(i) == 0) continue;
                foundSpeculativeAttempt = true;
            }
        }
    }

    private static class SpeculativeSinkFunction
    extends RichSinkFunction<Long>
    implements SupportsConcurrentExecutionAttempts {
        private final Map<Long, Long> numberCountResult = new HashMap<Long, Long>();

        private SpeculativeSinkFunction() {
        }

        public void invoke(Long value, SinkFunction.Context context) throws Exception {
            this.numberCountResult.merge(value, 1L, Long::sum);
            SpeculativeExecutionITCase.maybeSleep();
        }

        public void finish() {
            numberCountResults.put(this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.numberCountResult);
        }
    }

    private static class NonSpeculativeSinkFunction
    extends RichSinkFunction<Long> {
        private final Map<Long, Long> numberCountResult = new HashMap<Long, Long>();

        private NonSpeculativeSinkFunction() {
        }

        public void invoke(Long value, SinkFunction.Context context) throws Exception {
            if (slowTaskCounter.getAndDecrement() > 0) {
                Thread.sleep(5000L);
            }
            this.numberCountResult.merge(value, 1L, Long::sum);
        }

        public void finish() {
            if (this.getRuntimeContext().getTaskInfo().getAttemptNumber() == 0) {
                numberCountResults.put(this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.numberCountResult);
            }
        }
    }

    private static class DummyCommitter
    implements Committer<Tuple3<Integer, Integer, Map<Long, Long>>> {
        private static AtomicBoolean blocked = new AtomicBoolean(false);
        private static AtomicInteger attempts = new AtomicInteger(0);
        private static volatile boolean foundSpeculativeWriter;

        public DummyCommitter() {
            attempts.incrementAndGet();
        }

        public void commit(Collection<Committer.CommitRequest<Tuple3<Integer, Integer, Map<Long, Long>>>> committables) throws InterruptedException {
            for (Committer.CommitRequest<Tuple3<Integer, Integer, Map<Long, Long>>> request : committables) {
                Tuple3 committable = (Tuple3)request.getCommittable();
                numberCountResults.put(committable.f0, committable.f2);
                if ((Integer)committable.f1 <= 0) continue;
                foundSpeculativeWriter = true;
            }
            if (!blocked.getAndSet(true)) {
                Thread.sleep(5000L);
            }
        }

        public void close() throws Exception {
        }
    }

    @Deprecated
    private static class DummyPrecommittingSinkWriter
    implements TwoPhaseCommittingSink.PrecommittingSinkWriter<Long, Tuple3<Integer, Integer, Map<Long, Long>>> {
        private final int subTaskIndex;
        private final int attemptNumber;
        private final Map<Long, Long> numberCountResult = new HashMap<Long, Long>();

        public DummyPrecommittingSinkWriter(int subTaskIndex, int attemptNumber) {
            this.subTaskIndex = subTaskIndex;
            this.attemptNumber = attemptNumber;
        }

        public void write(Long value, SinkWriter.Context context) throws IOException, InterruptedException {
            this.numberCountResult.merge(value, 1L, Long::sum);
            SpeculativeExecutionITCase.maybeSleep();
        }

        public void flush(boolean endOfInput) {
        }

        public Collection<Tuple3<Integer, Integer, Map<Long, Long>>> prepareCommit() {
            return Collections.singleton(Tuple3.of((Object)this.subTaskIndex, (Object)this.attemptNumber, this.numberCountResult));
        }

        public void close() throws Exception {
        }
    }

    @Deprecated
    private static class SpeculativeSinkDeprecated
    implements TwoPhaseCommittingSink<Long, Tuple3<Integer, Integer, Map<Long, Long>>>,
    SupportsConcurrentExecutionAttempts {
        private SpeculativeSinkDeprecated() {
        }

        public TwoPhaseCommittingSink.PrecommittingSinkWriter<Long, Tuple3<Integer, Integer, Map<Long, Long>>> createWriter(Sink.InitContext context) {
            return new DummyPrecommittingSinkWriter(context.getTaskInfo().getIndexOfThisSubtask(), context.getTaskInfo().getAttemptNumber());
        }

        public Committer<Tuple3<Integer, Integer, Map<Long, Long>>> createCommitter(CommitterInitContext context) {
            return new DummyCommitter();
        }

        public SimpleVersionedSerializer<Tuple3<Integer, Integer, Map<Long, Long>>> getCommittableSerializer() {
            return new SimpleVersionedSerializer<Tuple3<Integer, Integer, Map<Long, Long>>>(){

                public int getVersion() {
                    return 0;
                }

                public byte[] serialize(Tuple3<Integer, Integer, Map<Long, Long>> obj) throws IOException {
                    return InstantiationUtil.serializeObject(obj);
                }

                public Tuple3<Integer, Integer, Map<Long, Long>> deserialize(int version, byte[] serialized) throws IOException {
                    try {
                        return (Tuple3)InstantiationUtil.deserializeObject((byte[])serialized, (ClassLoader)Thread.currentThread().getContextClassLoader());
                    }
                    catch (ClassNotFoundException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
        }
    }

    private static class DummyCommittingSinkWriter
    implements CommittingSinkWriter<Long, Tuple3<Integer, Integer, Map<Long, Long>>> {
        private final int subTaskIndex;
        private final int attemptNumber;
        private final Map<Long, Long> numberCountResult = new HashMap<Long, Long>();

        public DummyCommittingSinkWriter(int subTaskIndex, int attemptNumber) {
            this.subTaskIndex = subTaskIndex;
            this.attemptNumber = attemptNumber;
        }

        public void write(Long value, SinkWriter.Context context) throws IOException, InterruptedException {
            this.numberCountResult.merge(value, 1L, Long::sum);
            SpeculativeExecutionITCase.maybeSleep();
        }

        public void flush(boolean endOfInput) {
        }

        public Collection<Tuple3<Integer, Integer, Map<Long, Long>>> prepareCommit() {
            return Collections.singleton(Tuple3.of((Object)this.subTaskIndex, (Object)this.attemptNumber, this.numberCountResult));
        }

        public void close() throws Exception {
        }
    }

    private static class SpeculativeSink
    implements Sink<Long>,
    SupportsCommitter<Tuple3<Integer, Integer, Map<Long, Long>>>,
    SupportsConcurrentExecutionAttempts {
        private SpeculativeSink() {
        }

        public SinkWriter<Long> createWriter(Sink.InitContext context) {
            throw new UnsupportedOperationException("Not supported");
        }

        public CommittingSinkWriter<Long, Tuple3<Integer, Integer, Map<Long, Long>>> createWriter(WriterInitContext context) {
            return new DummyCommittingSinkWriter(context.getTaskInfo().getIndexOfThisSubtask(), context.getTaskInfo().getAttemptNumber());
        }

        public Committer<Tuple3<Integer, Integer, Map<Long, Long>>> createCommitter(CommitterInitContext context) {
            return new DummyCommitter();
        }

        public SimpleVersionedSerializer<Tuple3<Integer, Integer, Map<Long, Long>>> getCommittableSerializer() {
            return new SimpleVersionedSerializer<Tuple3<Integer, Integer, Map<Long, Long>>>(){

                public int getVersion() {
                    return 0;
                }

                public byte[] serialize(Tuple3<Integer, Integer, Map<Long, Long>> obj) throws IOException {
                    return InstantiationUtil.serializeObject(obj);
                }

                public Tuple3<Integer, Integer, Map<Long, Long>> deserialize(int version, byte[] serialized) throws IOException {
                    try {
                        return (Tuple3)InstantiationUtil.deserializeObject((byte[])serialized, (ClassLoader)Thread.currentThread().getContextClassLoader());
                    }
                    catch (ClassNotFoundException e) {
                        throw new RuntimeException(e);
                    }
                }
            };
        }
    }

    private static class NumberCounterSink
    extends RichSinkFunction<Long> {
        private final Map<Long, Long> numberCountResult = new HashMap<Long, Long>();

        private NumberCounterSink() {
        }

        public void invoke(Long value, SinkFunction.Context context) throws Exception {
            this.numberCountResult.merge(value, 1L, Long::sum);
        }

        public void finish() {
            numberCountResults.put(this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(), this.numberCountResult);
        }
    }

    private static class TestingIteratorSourceReader<E, IT extends Iterator<E>, SplitT extends IteratorSourceSplit<E, IT>>
    extends IteratorSourceReader<E, IT, SplitT> {
        private TestingIteratorSourceReader(SourceReaderContext context) {
            super(context);
        }

        public InputStatus pollNext(ReaderOutput<E> output) {
            SpeculativeExecutionITCase.maybeSleep();
            return super.pollNext(output);
        }

        public void close() throws Exception {
            if (TestingNumberSequenceSource.forceFailureCounter.get() > 0) {
                TestingNumberSequenceSource.forceFailureCounter.decrementAndGet();
                throw new RuntimeException("Forced failure for testing");
            }
        }
    }

    private static class TestingNumberSequenceSource
    extends NumberSequenceSource {
        private final boolean forceFailureFlag;
        public static AtomicInteger forceFailureCounter = new AtomicInteger(0);

        private TestingNumberSequenceSource(boolean forceFailureFlag) {
            super(0L, 9999L);
            this.forceFailureFlag = forceFailureFlag;
            if (forceFailureFlag) {
                forceFailureCounter = new AtomicInteger(1);
            }
        }

        public SourceReader<Long, NumberSequenceSource.NumberSequenceSplit> createReader(SourceReaderContext readerContext) {
            return new TestingIteratorSourceReader(readerContext);
        }

        public SplitEnumerator<NumberSequenceSource.NumberSequenceSplit, Collection<NumberSequenceSource.NumberSequenceSplit>> createEnumerator(SplitEnumeratorContext<NumberSequenceSource.NumberSequenceSplit> enumContext) {
            int splitSize = enumContext.currentParallelism();
            if (this.forceFailureFlag) {
                splitSize = 1;
            }
            List splits = this.splitNumberRange(0L, 9999L, splitSize);
            return new IteratorSourceEnumerator(enumContext, (Collection)splits);
        }
    }

    private static class TestingInputFormat
    extends GenericInputFormat<Long> {
        private static final int NUM_SPLITS = 100;
        private long nextNumberToEmit;
        private long maxNumberToEmit;
        private boolean end;

        private TestingInputFormat() {
        }

        public GenericInputSplit[] createInputSplits(int minNumSplits) {
            GenericInputSplit[] splits = new GenericInputSplit[100];
            for (int i = 0; i < 100; ++i) {
                splits[i] = new GenericInputSplit(i, 100);
            }
            return splits;
        }

        public boolean reachedEnd() {
            return this.end;
        }

        public Long nextRecord(Long reuse) {
            SpeculativeExecutionITCase.maybeSleep();
            if (this.nextNumberToEmit <= this.maxNumberToEmit) {
                return this.nextNumberToEmit++;
            }
            this.end = true;
            return null;
        }

        public void open(GenericInputSplit split) throws IOException {
            super.open(split);
            this.nextNumberToEmit = this.partitionNumber * 10000 / 100;
            this.maxNumberToEmit = (this.partitionNumber + 1) * 10000 / 100 - 1;
            this.end = false;
        }
    }

    private static class TestingSourceFunc
    extends RichParallelSourceFunction<Long> {
        private TestingSourceFunc() {
        }

        public void run(SourceFunction.SourceContext<Long> ctx) throws Exception {
            SpeculativeExecutionITCase.maybeSleep();
            int subtaskIndex = this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
            int numSubtasks = this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks();
            long start = subtaskIndex * 10000 / numSubtasks;
            long end = (subtaskIndex + 1) * 10000 / numSubtasks;
            for (long i = start; i < end; ++i) {
                ctx.collect((Object)i);
            }
        }

        public void cancel() {
        }
    }

    private static class NumberCounterMap
    extends RichMapFunction<Long, Long> {
        private static final AtomicInteger toFailCounter = new AtomicInteger(0);

        private NumberCounterMap() {
        }

        public Long map(Long value) throws Exception {
            if (toFailCounter.decrementAndGet() >= 0) {
                throw new Exception("Forced failure for testing");
            }
            SpeculativeExecutionITCase.maybeSleep();
            return value;
        }

        private static void reset() {
            toFailCounter.set(0);
        }
    }
}

