/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink;

import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.connector.file.sink.FileSinkITBase;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

class StreamingExecutionFileSinkITCase
extends FileSinkITBase {
    private static final Map<String, CountDownLatch> LATCH_MAP = new ConcurrentHashMap<String, CountDownLatch>();
    private String latchId;

    StreamingExecutionFileSinkITCase() {
    }

    @BeforeEach
    void setup() {
        this.latchId = UUID.randomUUID().toString();
        LATCH_MAP.put(this.latchId, new CountDownLatch(8));
    }

    @AfterEach
    void teardown() {
        LATCH_MAP.remove(this.latchId);
    }

    @Override
    protected JobGraph createJobGraph(boolean triggerFailover, String path) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration config = new Configuration();
        config.set(ExecutionOptions.RUNTIME_MODE, (Object)RuntimeExecutionMode.STREAMING);
        env.configure((ReadableConfig)config, this.getClass().getClassLoader());
        env.enableCheckpointing(10L, CheckpointingMode.EXACTLY_ONCE);
        if (triggerFailover) {
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (Time)Time.milliseconds((long)100L)));
        } else {
            env.setRestartStrategy(RestartStrategies.noRestart());
        }
        DataStreamSink sink = env.addSource((SourceFunction)new StreamingExecutionTestSource(this.latchId, 10000, triggerFailover)).setParallelism(4).sinkTo(this.createFileSink(path)).setParallelism(3);
        this.configureSink((DataStreamSink<Integer>)sink);
        StreamGraph streamGraph = env.getStreamGraph();
        return streamGraph.getJobGraph();
    }

    protected void configureSink(DataStreamSink<Integer> sink) {
    }

    private static class StreamingExecutionTestSource
    extends RichParallelSourceFunction<Integer>
    implements CheckpointListener,
    CheckpointedFunction {
        private final String latchId;
        private final int numberOfRecords;
        private final boolean isFailoverScenario;
        private ListState<Integer> nextValueState;
        private int nextValue;
        private volatile boolean isCanceled;
        private volatile boolean snapshottedAfterAllRecordsOutput;
        private volatile boolean isWaitingCheckpointComplete;
        private volatile boolean hasCompletedCheckpoint;

        public StreamingExecutionTestSource(String latchId, int numberOfRecords, boolean isFailoverScenario) {
            this.latchId = latchId;
            this.numberOfRecords = numberOfRecords;
            this.isFailoverScenario = isFailoverScenario;
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.nextValueState = context.getOperatorStateStore().getListState(new ListStateDescriptor("nextValue", Integer.class));
            if (this.nextValueState.get() != null && ((Iterable)this.nextValueState.get()).iterator().hasNext()) {
                this.nextValue = (Integer)((Iterable)this.nextValueState.get()).iterator().next();
            }
        }

        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            if (this.isFailoverScenario && this.getRuntimeContext().getAttemptNumber() == 0) {
                this.sendRecordsUntil((int)((double)this.numberOfRecords * 0.4 * 0.5), ctx);
                while (!this.hasCompletedCheckpoint) {
                    Thread.sleep(50L);
                }
                this.sendRecordsUntil((int)((double)this.numberOfRecords * 0.4), ctx);
                if (this.getRuntimeContext().getIndexOfThisSubtask() == 0) {
                    throw new RuntimeException("Designated Exception");
                }
                while (true) {
                    Thread.sleep(50L);
                }
            }
            this.sendRecordsUntil(this.numberOfRecords, ctx);
            this.isWaitingCheckpointComplete = true;
            CountDownLatch latch = (CountDownLatch)LATCH_MAP.get(this.latchId);
            latch.await();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void sendRecordsUntil(int targetNumber, SourceFunction.SourceContext<Integer> ctx) {
            while (!this.isCanceled && this.nextValue < targetNumber) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)this.nextValue++);
                }
            }
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            this.nextValueState.update(Collections.singletonList(this.nextValue));
            if (this.isWaitingCheckpointComplete) {
                this.snapshottedAfterAllRecordsOutput = true;
            }
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            if (this.isWaitingCheckpointComplete && this.snapshottedAfterAllRecordsOutput) {
                CountDownLatch latch = (CountDownLatch)LATCH_MAP.get(this.latchId);
                latch.countDown();
            }
            this.hasCompletedCheckpoint = true;
        }

        public void cancel() {
            this.isCanceled = true;
        }
    }
}

