/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nullable;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.AscendingTimestampsWatermarks;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.changelog.fs.FsStateChangelogStorageFactory;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.RestoreMode;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.test.state.ManualWindowSpeedITCase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ResumeCheckpointManuallyITCase
extends TestLogger {
    private static final int PARALLELISM = 2;
    private static final int NUM_TASK_MANAGERS = 2;
    private static final int SLOTS_PER_TASK_MANAGER = 2;
    @Parameterized.Parameter
    public RestoreMode restoreMode;
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Parameterized.Parameters(name="RestoreMode = {0}")
    public static Object[] parameters() {
        return RestoreMode.values();
    }

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)this.createRocksDBStateBackend(checkpointDir, true), false, this.restoreMode);
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)this.createRocksDBStateBackend(checkpointDir, false), false, this.restoreMode);
    }

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)this.createRocksDBStateBackend(checkpointDir, true), true, this.restoreMode);
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)this.createRocksDBStateBackend(checkpointDir, false), true, this.restoreMode);
    }

    @Test
    public void testExternalizedFSCheckpointsStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)this.createFsStateBackend(checkpointDir), false, this.restoreMode);
    }

    @Test
    public void testExternalizedFSCheckpointsWithLocalRecoveryStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)this.createFsStateBackend(checkpointDir), true, this.restoreMode);
    }

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsZookeeper() throws Exception {
        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();){
            File checkpointDir = temporaryFolder.newFolder();
            ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)this.createRocksDBStateBackend(checkpointDir, true), false, this.restoreMode);
        }
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsZookeeper() throws Exception {
        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();){
            File checkpointDir = temporaryFolder.newFolder();
            ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)this.createRocksDBStateBackend(checkpointDir, false), false, this.restoreMode);
        }
    }

    @Test
    public void testExternalizedIncrementalRocksDBCheckpointsWithLocalRecoveryZookeeper() throws Exception {
        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();){
            File checkpointDir = temporaryFolder.newFolder();
            ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)this.createRocksDBStateBackend(checkpointDir, true), true, this.restoreMode);
        }
    }

    @Test
    public void testExternalizedFullRocksDBCheckpointsWithLocalRecoveryZookeeper() throws Exception {
        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();){
            File checkpointDir = temporaryFolder.newFolder();
            ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)this.createRocksDBStateBackend(checkpointDir, false), true, this.restoreMode);
        }
    }

    @Test
    public void testExternalizedFSCheckpointsZookeeper() throws Exception {
        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();){
            File checkpointDir = temporaryFolder.newFolder();
            ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)this.createFsStateBackend(checkpointDir), false, this.restoreMode);
        }
    }

    @Test
    public void testExternalizedFSCheckpointsWithLocalRecoveryZookeeper() throws Exception {
        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();){
            File checkpointDir = temporaryFolder.newFolder();
            ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)this.createFsStateBackend(checkpointDir), true, this.restoreMode);
        }
    }

    @Test
    public void testExternalizedSwitchRocksDBCheckpointsStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        RocksDBStateBackend previousStateBackend = this.createRocksDBStateBackend(checkpointDir, false);
        RocksDBStateBackend newStateBackend = this.createRocksDBStateBackend(checkpointDir, true);
        ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)previousStateBackend, (StateBackend)newStateBackend, (StateBackend)previousStateBackend, false, this.restoreMode);
    }

    @Test
    public void testExternalizedSwitchRocksDBCheckpointsWithLocalRecoveryStandalone() throws Exception {
        File checkpointDir = temporaryFolder.newFolder();
        RocksDBStateBackend previousStateBackend = this.createRocksDBStateBackend(checkpointDir, false);
        RocksDBStateBackend newStateBackend = this.createRocksDBStateBackend(checkpointDir, true);
        ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, null, (StateBackend)previousStateBackend, (StateBackend)newStateBackend, (StateBackend)previousStateBackend, true, this.restoreMode);
    }

    @Test
    public void testExternalizedSwitchRocksDBCheckpointsZookeeper() throws Exception {
        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();){
            File checkpointDir = temporaryFolder.newFolder();
            RocksDBStateBackend previousStateBackend = this.createRocksDBStateBackend(checkpointDir, false);
            RocksDBStateBackend newStateBackend = this.createRocksDBStateBackend(checkpointDir, true);
            ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)previousStateBackend, (StateBackend)newStateBackend, (StateBackend)previousStateBackend, false, this.restoreMode);
        }
    }

    @Test
    public void testExternalizedSwitchRocksDBCheckpointsWithLocalRecoveryZookeeper() throws Exception {
        try (TestingServer zkServer = ZooKeeperTestUtils.createAndStartZookeeperTestingServer();){
            File checkpointDir = temporaryFolder.newFolder();
            RocksDBStateBackend previousStateBackend = this.createRocksDBStateBackend(checkpointDir, false);
            RocksDBStateBackend newStateBackend = this.createRocksDBStateBackend(checkpointDir, true);
            ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, zkServer.getConnectString(), (StateBackend)previousStateBackend, (StateBackend)newStateBackend, (StateBackend)previousStateBackend, true, this.restoreMode);
        }
    }

    private FsStateBackend createFsStateBackend(File checkpointDir) throws IOException {
        return new FsStateBackend(checkpointDir.toURI().toString(), true);
    }

    private RocksDBStateBackend createRocksDBStateBackend(File checkpointDir, boolean incrementalCheckpointing) throws IOException {
        return new RocksDBStateBackend(checkpointDir.toURI().toString(), incrementalCheckpointing);
    }

    private static void testExternalizedCheckpoints(File checkpointDir, String zooKeeperQuorum, StateBackend backend, boolean localRecovery, RestoreMode restoreMode) throws Exception {
        ResumeCheckpointManuallyITCase.testExternalizedCheckpoints(checkpointDir, zooKeeperQuorum, backend, backend, backend, localRecovery, restoreMode);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void testExternalizedCheckpoints(File checkpointDir, String zooKeeperQuorum, StateBackend backend1, StateBackend backend2, StateBackend backend3, boolean localRecovery, RestoreMode restoreMode) throws Exception {
        Configuration config = new Configuration();
        File savepointDir = temporaryFolder.newFolder();
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointDir.toURI().toString());
        config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)savepointDir.toURI().toString());
        config.set(CheckpointingOptions.LOCAL_RECOVERY, (Object)localRecovery);
        FsStateChangelogStorageFactory.configure((Configuration)config, (File)temporaryFolder.newFolder(), (Duration)Duration.ofMinutes(1L), (int)10);
        if (zooKeeperQuorum != null) {
            File haDir = temporaryFolder.newFolder();
            config.set(HighAvailabilityOptions.HA_MODE, (Object)"ZOOKEEPER");
            config.set(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, (Object)zooKeeperQuorum);
            config.set(HighAvailabilityOptions.HA_STORAGE_PATH, (Object)haDir.toURI().toString());
        }
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
        cluster.before();
        try {
            String firstExternalCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint(backend1, null, cluster, restoreMode);
            Assert.assertNotNull((Object)firstExternalCheckpoint);
            String secondExternalCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint(backend2, firstExternalCheckpoint, cluster, restoreMode);
            Assert.assertNotNull((Object)secondExternalCheckpoint);
            String thirdExternalCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint(backend3, restoreMode == RestoreMode.CLAIM ? secondExternalCheckpoint : firstExternalCheckpoint, cluster, restoreMode);
            Assert.assertNotNull((Object)thirdExternalCheckpoint);
        }
        finally {
            cluster.after();
        }
    }

    private static String runJobAndGetExternalizedCheckpoint(StateBackend backend, @Nullable String externalCheckpoint, MiniClusterWithClientResource cluster, RestoreMode restoreMode) throws Exception {
        JobGraph initialJobGraph = ResumeCheckpointManuallyITCase.getJobGraph(backend, externalCheckpoint, restoreMode);
        NotifyingInfiniteTupleSource.countDownLatch = new CountDownLatch(2);
        cluster.getClusterClient().submitJob(initialJobGraph).get();
        NotifyingInfiniteTupleSource.countDownLatch.await();
        CommonTestUtils.waitForCheckpoint((JobID)initialJobGraph.getJobID(), (MiniCluster)cluster.getMiniCluster(), (int)2);
        cluster.getClusterClient().cancel(initialJobGraph.getJobID()).get();
        TestUtils.waitUntilJobCanceled((JobID)initialJobGraph.getJobID(), (ClusterClient)cluster.getClusterClient());
        return (String)CommonTestUtils.getLatestCompletedCheckpointPath((JobID)initialJobGraph.getJobID(), (MiniCluster)cluster.getMiniCluster()).orElseThrow(() -> {
            throw new IllegalStateException("Checkpoint not generated");
        });
    }

    private static JobGraph getJobGraph(StateBackend backend, @Nullable String externalCheckpoint, RestoreMode restoreMode) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(500L);
        env.setStateBackend(backend);
        env.setParallelism(2);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setRestartStrategy(RestartStrategies.noRestart());
        env.addSource((SourceFunction)new NotifyingInfiniteTupleSource(10000)).assignTimestampsAndWatermarks(IngestionTimeWatermarkStrategy.create()).keyBy(new int[]{0}).window((WindowAssigner)TumblingEventTimeWindows.of((Time)Time.seconds((long)3L))).reduce((ReduceFunction & Serializable)(value1, value2) -> Tuple2.of((Object)value1.f0, (Object)((Integer)value1.f1 + (Integer)value2.f1))).filter((FilterFunction & Serializable)value -> ((String)value.f0).startsWith("Tuple 0"));
        StreamGraph streamGraph = env.getStreamGraph();
        JobGraph jobGraph = streamGraph.getJobGraph();
        if (externalCheckpoint != null) {
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)externalCheckpoint, (boolean)false, (RestoreMode)restoreMode));
        }
        return jobGraph;
    }

    private static class IngestionTimeWatermarkStrategy<T>
    implements WatermarkStrategy<T> {
        private IngestionTimeWatermarkStrategy() {
        }

        public static <T> IngestionTimeWatermarkStrategy<T> create() {
            return new IngestionTimeWatermarkStrategy<T>();
        }

        public WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new AscendingTimestampsWatermarks();
        }

        public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (event, timestamp) -> System.currentTimeMillis();
        }
    }

    public static class NotifyingInfiniteTupleSource
    extends ManualWindowSpeedITCase.InfiniteTupleSource {
        private static final long serialVersionUID = 8120981235081181746L;
        private static CountDownLatch countDownLatch;

        public NotifyingInfiniteTupleSource(int numKeys) {
            super(numKeys);
        }

        @Override
        public void run(SourceFunction.SourceContext<Tuple2<String, Integer>> out) throws Exception {
            if (countDownLatch != null) {
                countDownLatch.countDown();
            }
            super.run(out);
        }
    }
}

