/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.state.ManualWindowSpeedITCase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class ResumeCheckpointManuallyITCase
extends TestLogger {
    private static final int PARALLELISM = 2;
    private static final int NUM_TASK_MANAGERS = 2;
    private static final int SLOTS_PER_TASK_MANAGER = 2;
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        this.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)this.createRocksDBStateBackend(checkpointDir, true), false);
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        this.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)this.createRocksDBStateBackend(checkpointDir, false), false);
    }

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        this.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)this.createRocksDBStateBackend(checkpointDir, true), true);
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        this.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)this.createRocksDBStateBackend(checkpointDir, false), true);
    }

    @Test
    public void testExternalizedFSCheckpointsStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        this.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)this.createFsStateBackend(checkpointDir), false);
    }

    @Test
    public void testExternalizedFSCheckpointsWithLocalRecoveryStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        this.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)this.createFsStateBackend(checkpointDir), true);
    }

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsZookeeper() throws Exception {
        TestingServer zkServer = new TestingServer();
        zkServer.start();
        try {
            File checkpointDir = temporaryFolder.newFolder();
            this.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)this.createRocksDBStateBackend(checkpointDir, true), false);
        }
        finally {
            zkServer.stop();
        }
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsZookeeper() throws Exception {
        TestingServer zkServer = new TestingServer();
        zkServer.start();
        try {
            File checkpointDir = temporaryFolder.newFolder();
            this.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)this.createRocksDBStateBackend(checkpointDir, false), false);
        }
        finally {
            zkServer.stop();
        }
    }

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper() throws Exception {
        TestingServer zkServer = new TestingServer();
        zkServer.start();
        try {
            File checkpointDir = temporaryFolder.newFolder();
            this.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)this.createRocksDBStateBackend(checkpointDir, true), true);
        }
        finally {
            zkServer.stop();
        }
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryZookeeper() throws Exception {
        TestingServer zkServer = new TestingServer();
        zkServer.start();
        try {
            File checkpointDir = temporaryFolder.newFolder();
            this.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)this.createRocksDBStateBackend(checkpointDir, false), true);
        }
        finally {
            zkServer.stop();
        }
    }

    @Test
    public void testExternalizedFSCheckpointsZookeeper() throws Exception {
        TestingServer zkServer = new TestingServer();
        zkServer.start();
        try {
            File checkpointDir = temporaryFolder.newFolder();
            this.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)this.createFsStateBackend(checkpointDir), false);
        }
        finally {
            zkServer.stop();
        }
    }

    @Test
    public void testExternalizedFSCheckpointsWithLocalRecoveryZookeeper() throws Exception {
        TestingServer zkServer = new TestingServer();
        zkServer.start();
        try {
            File checkpointDir = temporaryFolder.newFolder();
            this.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)this.createFsStateBackend(checkpointDir), true);
        }
        finally {
            zkServer.stop();
        }
    }

    private FsStateBackend createFsStateBackend(File checkpointDir) throws IOException {
        return new FsStateBackend(checkpointDir.toURI().toString(), true);
    }

    private RocksDBStateBackend createRocksDBStateBackend(File checkpointDir, boolean incrementalCheckpointing) throws IOException {
        return new RocksDBStateBackend(checkpointDir.toURI().toString(), incrementalCheckpointing);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testExternalizedCheckpoints(File checkpointDir, String zooKeeperQuorum, StateBackend backend, boolean localRecovery) throws Exception {
        Configuration config = new Configuration();
        File savepointDir = temporaryFolder.newFolder();
        config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
        config.setBoolean(CheckpointingOptions.LOCAL_RECOVERY, localRecovery);
        if (zooKeeperQuorum != null) {
            File haDir = temporaryFolder.newFolder();
            config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER");
            config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zooKeeperQuorum);
            config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString());
        }
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        client.setDetached(true);
        try {
            String firstExternalCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint(backend, checkpointDir, null, client);
            Assert.assertNotNull((Object)firstExternalCheckpoint);
            String secondExternalCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint(backend, checkpointDir, firstExternalCheckpoint, client);
            Assert.assertNotNull((Object)secondExternalCheckpoint);
            String thirdExternalCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint(backend, checkpointDir, secondExternalCheckpoint, client);
            Assert.assertNotNull((Object)thirdExternalCheckpoint);
        }
        finally {
            cluster.after();
        }
    }

    private static String runJobAndGetExternalizedCheckpoint(StateBackend backend, File checkpointDir, @Nullable String externalCheckpoint, ClusterClient<?> client) throws Exception {
        JobGraph initialJobGraph = ResumeCheckpointManuallyITCase.getJobGraph(backend, externalCheckpoint);
        NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(2);
        client.submitJob(initialJobGraph, ResumeCheckpointManuallyITCase.class.getClassLoader());
        NotifyingInfiniteTupleSource.countDownLatch.await();
        ResumeCheckpointManuallyITCase.waitUntilExternalizedCheckpointCreated(checkpointDir, initialJobGraph.getJobID());
        client.cancel(initialJobGraph.getJobID());
        ResumeCheckpointManuallyITCase.waitUntilCanceled(initialJobGraph.getJobID(), client);
        return ResumeCheckpointManuallyITCase.getExternalizedCheckpointCheckpointPath(checkpointDir, initialJobGraph.getJobID());
    }

    private static String getExternalizedCheckpointCheckpointPath(File checkpointDir, JobID jobId) throws IOException {
        Optional<Path> checkpoint = ResumeCheckpointManuallyITCase.findExternalizedCheckpoint(checkpointDir, jobId);
        if (!checkpoint.isPresent()) {
            throw new AssertionError((Object)"No complete checkpoint could be found.");
        }
        return checkpoint.get().toString();
    }

    private static void waitUntilExternalizedCheckpointCreated(File checkpointDir, JobID jobId) throws InterruptedException, IOException {
        Optional<Path> externalizedCheckpoint;
        do {
            Thread.sleep(50L);
        } while (!(externalizedCheckpoint = ResumeCheckpointManuallyITCase.findExternalizedCheckpoint(checkpointDir, jobId)).isPresent());
    }

    private static Optional<Path> findExternalizedCheckpoint(File checkpointDir, JobID jobId) throws IOException {
        try (Stream<Path> checkpoints = Files.list(checkpointDir.toPath().resolve(jobId.toString()));){
            Optional<Path> optional = checkpoints.filter(path -> path.getFileName().toString().startsWith("chk-")).filter(path -> {
                try (Stream<Path> checkpointFiles = Files.list(path);){
                    boolean bl = checkpointFiles.anyMatch(child -> child.getFileName().toString().contains("meta"));
                    return bl;
                }
                catch (IOException ignored) {
                    return false;
                }
            }).findAny();
            return optional;
        }
    }

    private static void waitUntilCanceled(JobID jobId, ClusterClient<?> client) throws ExecutionException, InterruptedException {
        while (client.getJobStatus(jobId).get() != JobStatus.CANCELED) {
            Thread.sleep(50L);
        }
    }

    private static JobGraph getJobGraph(StateBackend backend, @Nullable String externalCheckpoint) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(500L);
        env.setStateBackend(backend);
        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
        env.setParallelism(2);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.addSource((SourceFunction)new NotifyingInfiniteTupleSource(10000)).keyBy(new int[]{0}).timeWindow(Time.seconds((long)3L)).reduce((ReduceFunction & Serializable)(value1, value2) -> Tuple2.of((Object)value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1))).filter((FilterFunction & Serializable)value -> ((String)value.f0).startsWith("Tuple 0"));
        StreamGraph streamGraph = env.getStreamGraph("Test");
        JobGraph jobGraph = streamGraph.getJobGraph();
        if (externalCheckpoint != null) {
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)externalCheckpoint));
        }
        return jobGraph;
    }

    public static class NotifyingInfiniteTupleSource
    extends ManualWindowSpeedITCase.InfiniteTupleSource {
        private static final long serialVersionUID = 8120981235081181746L;
        private static CountDownLatch countDownLatch;

        public NotifyingInfiniteTupleSource(int numKeys) {
            super(numKeys);
        }

        @Override
        public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> out) throws Exception {
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
            super.run(out);
        }
    }
}

