/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.checkpointing.utils.AccumulatingIntegerSink;
import org.apache.flink.test.checkpointing.utils.CancellingIntegerSource;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class UnalignedCheckpointCompatibilityITCase {
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final int TOTAL_ELEMENTS = 20;
    private static final int FIRST_RUN_EL_COUNT = 10;
    private static final int FIRST_RUN_BACKPRESSURE_MS = 100;
    private static final int PARALLELISM = 1;
    private final boolean startAligned;
    private final CheckpointType type;

    @Parameterized.Parameters(name="type: {0}, startAligned: {1}")
    public static Object[][] parameters() {
        return new Object[][]{{CheckpointType.CHECKPOINT, true}, {CheckpointType.CHECKPOINT, false}, {CheckpointType.SAVEPOINT, true}, {CheckpointType.SAVEPOINT, false}};
    }

    public UnalignedCheckpointCompatibilityITCase(CheckpointType type, boolean startAligned) {
        this.startAligned = startAligned;
        this.type = type;
    }

    @Test
    public void test() throws Exception {
        Tuple2<String, Map<String, Object>> pathAndAccumulators = this.type.isSavepoint() ? this.runAndTakeSavepoint() : this.runAndTakeExternalCheckpoint();
        String savepointPath = (String)pathAndAccumulators.f0;
        Map accumulatorsBeforeBarrier = (Map)pathAndAccumulators.f1;
        Map<String, Object> accumulatorsAfterBarrier = this.runFromSavepoint(savepointPath, !this.startAligned, 20);
        if (this.type.isSavepoint()) {
            Assert.assertEquals(UnalignedCheckpointCompatibilityITCase.intRange(0, 20), UnalignedCheckpointCompatibilityITCase.extractAndConcat(accumulatorsBeforeBarrier, accumulatorsAfterBarrier));
        }
    }

    private Tuple2<String, Map<String, Object>> runAndTakeSavepoint() throws Exception {
        JobClient jobClient = UnalignedCheckpointCompatibilityITCase.submitJobInitially(this.env(this.startAligned, 0, Collections.emptyMap()));
        Thread.sleep(1000L);
        CompletableFuture accFuture = jobClient.getAccumulators(this.getClass().getClassLoader());
        CompletableFuture savepointFuture = jobClient.stopWithSavepoint(false, this.tempFolder().toURI().toString());
        return new Tuple2(savepointFuture.get(), accFuture.get());
    }

    private Tuple2<String, Map<String, Object>> runAndTakeExternalCheckpoint() throws Exception {
        File folder = this.tempFolder();
        JobClient jobClient = UnalignedCheckpointCompatibilityITCase.submitJobInitially(this.externalCheckpointEnv(this.startAligned, folder, 100));
        File metadata = UnalignedCheckpointCompatibilityITCase.waitForChild(UnalignedCheckpointCompatibilityITCase.waitForChild(UnalignedCheckpointCompatibilityITCase.waitForChild(folder)));
        this.cancelJob(jobClient);
        return new Tuple2((Object)metadata.getParentFile().toString(), Collections.emptyMap());
    }

    private static JobClient submitJobInitially(StreamExecutionEnvironment env) throws Exception {
        return env.executeAsync(UnalignedCheckpointCompatibilityITCase.dag(10, true, 100, env));
    }

    private Map<String, Object> runFromSavepoint(String path, boolean isAligned, int totalCount) throws Exception {
        StreamExecutionEnvironment env = this.env(isAligned, 50, Collections.singletonMap(SavepointConfigOptions.SAVEPOINT_PATH, path));
        return env.execute(UnalignedCheckpointCompatibilityITCase.dag(totalCount, false, 0, env)).getJobExecutionResult().getAllAccumulatorResults();
    }

    private static File waitForChild(File dir) throws InterruptedException {
        while (dir.listFiles().length == 0) {
            Thread.sleep(50L);
        }
        return Arrays.stream(dir.listFiles()).max(Comparator.naturalOrder()).get();
    }

    private void cancelJob(JobClient jobClient) throws InterruptedException, ExecutionException {
        jobClient.cancel().get();
        try {
            jobClient.getJobExecutionResult(this.getClass().getClassLoader());
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    private StreamExecutionEnvironment externalCheckpointEnv(boolean isAligned, File dir, int checkpointingInterval) {
        HashMap cfg = new HashMap();
        cfg.put(CheckpointingOptions.CHECKPOINTS_DIRECTORY, dir.toURI().toString());
        cfg.put(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, Integer.toString(Integer.MAX_VALUE));
        StreamExecutionEnvironment env = this.env(isAligned, checkpointingInterval, cfg);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        return env;
    }

    private StreamExecutionEnvironment env(boolean isAligned, int checkpointingInterval, Map<ConfigOption<?>, String> cfg) {
        Configuration configuration = new Configuration();
        for (Map.Entry<ConfigOption<?>, String> e : cfg.entrySet()) {
            configuration.setString(e.getKey(), e.getValue());
        }
        LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment((int)1, (Configuration)configuration);
        env.setRestartStrategy((RestartStrategies.RestartStrategyConfiguration)new RestartStrategies.NoRestartStrategyConfiguration());
        env.getCheckpointConfig().enableUnalignedCheckpoints(!isAligned);
        if (checkpointingInterval > 0) {
            env.enableCheckpointing((long)checkpointingInterval);
        }
        return env;
    }

    private static StreamGraph dag(int numElements, boolean continueAfterNumElementsReached, int sinkDelayMillis, StreamExecutionEnvironment env) {
        env.addSource((SourceFunction)CancellingIntegerSource.upTo(numElements, continueAfterNumElementsReached)).addSink((SinkFunction)new AccumulatingIntegerSink(sinkDelayMillis));
        return env.getStreamGraph();
    }

    private static List<Integer> intRange(int from, int to) {
        return IntStream.range(from, to).boxed().collect(Collectors.toList());
    }

    private static List<Integer> extractAndConcat(Map<String, Object> ... accumulators) {
        return Stream.of(accumulators).map(AccumulatingIntegerSink::getOutput).peek(l -> Preconditions.checkState((!l.isEmpty() ? 1 : 0) != 0)).flatMap(Collection::stream).collect(Collectors.toList());
    }

    private File tempFolder() throws IOException {
        return this.temporaryFolder.newFolder();
    }
}

