/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.rest.RestClient;
import org.apache.flink.runtime.rest.RestClientConfiguration;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SavepointITCase
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class);
    @Rule
    public final TemporaryFolder folder = new TemporaryFolder();
    private File checkpointDir;
    private File savepointDir;
    private static OneShotLatch failingPipelineLatch;
    private static OneShotLatch succeedingPipelineLatch;
    private static final int ITER_TEST_PARALLELISM = 1;
    private static OneShotLatch[] iterTestSnapshotWait;
    private static OneShotLatch[] iterTestRestoreWait;
    private static int[] iterTestCheckpointVerify;

    @Before
    public void setUp() throws Exception {
        File testRoot = this.folder.newFolder();
        this.checkpointDir = new File(testRoot, "checkpoints");
        this.savepointDir = new File(testRoot, "savepoints");
        if (!this.checkpointDir.mkdir() || !this.savepointDir.mkdirs()) {
            Assert.fail((String)"Test setup failed: failed to create temporary directories.");
        }
    }

    @Test
    public void testStopWithSavepointForFlip27SourceWithDrain() throws Exception {
        this.testStopWithSavepointForFlip27Source(true);
    }

    @Test
    public void testStopWithSavepointForFlip27SourceWithoutDrain() throws Exception {
        this.testStopWithSavepointForFlip27Source(false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testStopWithSavepointForFlip27Source(boolean drain) throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getFileBasedCheckpointsConfig());
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        BoundedPassThroughOperator operator = new BoundedPassThroughOperator(ChainingStrategy.ALWAYS);
        SingleOutputStreamOperator stream = env.fromSequence(0L, Long.MAX_VALUE).transform("pass-through", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO, operator);
        stream.addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        JobID jobId = jobGraph.getJobID();
        MiniClusterWithClientResource cluster = clusterFactory.get();
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            BoundedPassThroughOperator.resetForTest(1, true);
            client.submitJob(jobGraph).get();
            BoundedPassThroughOperator.getProgressLatch().await();
            client.stopWithSavepoint(jobId, drain, null).get();
            if (drain) {
                Assert.assertTrue((boolean)BoundedPassThroughOperator.inputEnded);
            } else {
                Assert.assertFalse((boolean)BoundedPassThroughOperator.inputEnded);
            }
        }
        finally {
            cluster.after();
        }
    }

    @Test
    public void testTriggerSavepointAndResumeWithFileBasedCheckpoints() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 4;
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getFileBasedCheckpointsConfig());
        String savepointPath = this.submitJobAndTakeSavepoint(clusterFactory, 4);
        this.verifySavepoint(4, savepointPath);
        this.restoreJobAndVerifyState(savepointPath, clusterFactory, 4);
    }

    @Test
    public void testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 4;
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getFileBasedCheckpointsConfig());
        String savepointPath = this.submitJobAndTakeSavepoint(clusterFactory, 4);
        Path oldPath = new Path(savepointPath);
        Path newPath = new Path(this.folder.newFolder().toURI().toString());
        new Path(savepointPath).getFileSystem().rename(oldPath, newPath);
        this.verifySavepoint(4, newPath.toUri().toString());
        this.restoreJobAndVerifyState(newPath.toUri().toString(), clusterFactory, 4);
    }

    @Test
    public void testShouldAddEntropyToSavepointPath() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 4;
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getCheckpointingWithEntropyConfig());
        String savepointPath = this.submitJobAndTakeSavepoint(clusterFactory, 4);
        Assert.assertThat((Object)this.savepointDir, SavepointITCase.hasEntropyInFileStateHandlePaths());
        this.restoreJobAndVerifyState(savepointPath, clusterFactory, 4);
    }

    private Configuration getCheckpointingWithEntropyConfig() {
        String savepointPathWithEntropyPlaceholder = new File(this.savepointDir, "_entropy_").getPath();
        Configuration config = this.getFileBasedCheckpointsConfig("test-entropy://" + savepointPathWithEntropyPlaceholder);
        config.setString("s3.entropy.key", "_entropy_");
        return config;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String submitJobAndTakeSavepoint(MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
        JobGraph jobGraph = this.createJobGraph(parallelism, 0, 1000L);
        JobID jobId = jobGraph.getJobID();
        StatefulCounter.resetForTest(parallelism);
        MiniClusterWithClientResource cluster = clusterFactory.get();
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            client.submitJob(jobGraph).get();
            StatefulCounter.getProgressLatch().await();
            String string = (String)client.cancelWithSavepoint(jobId, null).get();
            return string;
        }
        finally {
            cluster.after();
            StatefulCounter.resetForTest(parallelism);
        }
    }

    private void verifySavepoint(int parallelism, String savepointPath) throws URISyntaxException {
        File savepointDir = new File(new URI(savepointPath));
        Assert.assertTrue((String)"Savepoint directory does not exist.", (boolean)savepointDir.exists());
        Assert.assertTrue((String)"Savepoint did not create self-contained directory.", (boolean)savepointDir.isDirectory());
        Object[] savepointFiles = savepointDir.listFiles();
        if (savepointFiles != null) {
            String errMsg = "Did not write expected number of savepoint/checkpoint files to directory: " + Arrays.toString(savepointFiles);
            Assert.assertEquals((String)errMsg, (long)(1 + parallelism), (long)savepointFiles.length);
        } else {
            Assert.fail((String)String.format("Returned savepoint path (%s) is not valid.", savepointPath));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
        JobGraph jobGraph = this.createJobGraph(parallelism, 0, 1000L);
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath, (boolean)false));
        JobID jobId = jobGraph.getJobID();
        StatefulCounter.resetForTest(parallelism);
        MiniClusterWithClientResource cluster = clusterFactory.get();
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            client.submitJob(jobGraph).get();
            StatefulCounter.getRestoreLatch().await();
            StatefulCounter.getProgressLatch().await();
            client.cancel(jobId).get();
            FutureUtils.retrySuccessfulWithDelay(() -> client.getJobStatus(jobId), (Time)Time.milliseconds((long)50L), (Deadline)Deadline.now().plus(Duration.ofSeconds(30L)), status -> status == JobStatus.CANCELED, (ScheduledExecutor)TestingUtils.defaultScheduledExecutor());
            client.disposeSavepoint(savepointPath).get();
            Assert.assertFalse((String)"Savepoint not properly cleaned up.", (boolean)new File(savepointPath).exists());
        }
        finally {
            cluster.after();
            StatefulCounter.resetForTest(parallelism);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerSavepointForNonExistingJob() throws Exception {
        boolean numTaskManagers = true;
        boolean numSlotsPerTaskManager = true;
        Configuration config = new Configuration();
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, this.savepointDir.toURI().toString());
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        JobID jobID = new JobID();
        try {
            client.triggerSavepoint(jobID, null).get();
            Assert.fail();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, FlinkJobNotFoundException.class).isPresent());
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)jobID.toString()).isPresent());
        }
        finally {
            cluster.after();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerSavepointWithCheckpointingDisabled() throws Exception {
        boolean numTaskManagers = true;
        boolean numSlotsPerTaskManager = true;
        Configuration config = new Configuration();
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        JobVertex vertex = new JobVertex("Blocking vertex");
        vertex.setInvokableClass(BlockingNoOpInvokable.class);
        vertex.setParallelism(1);
        JobGraph graph = JobGraphTestUtils.streamingJobGraph((JobVertex[])new JobVertex[]{vertex});
        try {
            client.submitJob(graph).get();
            TestUtils.waitUntilJobInitializationFinished(graph.getJobID(), cluster, ClassLoader.getSystemClassLoader());
            client.triggerSavepoint(graph.getJobID(), null).get();
            Assert.fail();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, IllegalStateException.class).isPresent());
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)graph.getJobID().toString()).isPresent());
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"is not a streaming job").isPresent());
        }
        finally {
            cluster.after();
        }
    }

    private static boolean ischeckpointcoordinatorshutdownError(Throwable throwable) {
        return ExceptionUtils.findThrowable((Throwable)throwable, CheckpointException.class).filter(e -> e.getCheckpointFailureReason() == CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN).isPresent();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStopSavepointWithBoundedInput() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        for (ChainingStrategy chainingStrategy : ChainingStrategy.values()) {
            MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getFileBasedCheckpointsConfig());
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            BoundedPassThroughOperator operator = new BoundedPassThroughOperator(chainingStrategy);
            SingleOutputStreamOperator stream = env.addSource((SourceFunction)new InfiniteTestSource()).transform("pass-through", (TypeInformation)BasicTypeInfo.INT_TYPE_INFO, operator);
            stream.addSink((SinkFunction)new DiscardingSink());
            JobGraph jobGraph = env.getStreamGraph().getJobGraph();
            JobID jobId = jobGraph.getJobID();
            MiniClusterWithClientResource cluster = clusterFactory.get();
            cluster.before();
            ClusterClient client = cluster.getClusterClient();
            try {
                BoundedPassThroughOperator.resetForTest(1, true);
                client.submitJob(jobGraph).get();
                BoundedPassThroughOperator.getProgressLatch().await();
                client.stopWithSavepoint(jobId, false, null).get();
                Assert.assertFalse((String)("input ended with chainingStrategy " + chainingStrategy), (boolean)BoundedPassThroughOperator.inputEnded);
            }
            finally {
                cluster.after();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubmitWithUnknownSavepointPath() throws Exception {
        int numTaskManagers = 1;
        int numSlotsPerTaskManager = 1;
        int parallelism = numTaskManagers * numSlotsPerTaskManager;
        Configuration config = new Configuration();
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, this.savepointDir.toURI().toString());
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(numTaskManagers).setNumberSlotsPerTaskManager(numSlotsPerTaskManager).build());
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            int numberOfRetries = 1000;
            JobGraph jobGraph = this.createJobGraph(parallelism, numberOfRetries, 3600000L);
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)"unknown path"));
            Assert.assertEquals((Object)"unknown path", (Object)jobGraph.getSavepointRestoreSettings().getRestorePath());
            LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
            try {
                TestUtils.submitJobAndWaitForResult(client, jobGraph, ((Object)((Object)this)).getClass().getClassLoader());
            }
            catch (Exception e) {
                Optional expectedJobExecutionException = ExceptionUtils.findThrowable((Throwable)e, JobExecutionException.class);
                Optional expectedFileNotFoundException = ExceptionUtils.findThrowable((Throwable)e, FileNotFoundException.class);
                if (!expectedJobExecutionException.isPresent() || !expectedFileNotFoundException.isPresent()) {
                    throw e;
                }
            }
        }
        finally {
            cluster.after();
        }
    }

    @Test
    public void testStopWithSavepointFailingInSnapshotCreation() throws Exception {
        SavepointITCase.testStopWithFailingSourceInOnePipeline(new SnapshotFailingInfiniteTestSource(), this.folder.newFolder(), 2, SavepointITCase.assertInSnapshotCreationFailure());
    }

    @Test
    public void testStopWithSavepointFailingAfterSnapshotCreation() throws Exception {
        CancelFailingInfiniteTestSource.cancelTriggered = false;
        SavepointITCase.testStopWithFailingSourceInOnePipeline(new CancelFailingInfiniteTestSource(), this.folder.newFolder(), 2, SavepointITCase.assertAfterSnapshotCreationFailure());
    }

    private static BiConsumer<JobID, ExecutionException> assertAfterSnapshotCreationFailure() {
        return (jobId, actualException) -> {
            if (ClusterOptions.isAdaptiveSchedulerEnabled((Configuration)new Configuration())) {
                Assert.assertThat((Object)actualException, (Matcher)FlinkMatchers.containsMessage((String)"Stop with savepoint operation could not be completed"));
            } else {
                Optional actualFlinkException = ExceptionUtils.findThrowable((Throwable)actualException, FlinkException.class);
                Assert.assertTrue((boolean)actualFlinkException.isPresent());
                Assert.assertThat(actualFlinkException.get(), (Matcher)FlinkMatchers.containsMessage((String)String.format("A global fail-over is triggered to recover the job %s.", jobId)));
            }
        };
    }

    private static BiConsumer<JobID, ExecutionException> assertInSnapshotCreationFailure() {
        return (ignored, actualException) -> {
            if (ClusterOptions.isAdaptiveSchedulerEnabled((Configuration)new Configuration())) {
                Assert.assertThat((Object)actualException, (Matcher)FlinkMatchers.containsCause(FlinkException.class));
            } else {
                Optional actualFailureCause = ExceptionUtils.findThrowable((Throwable)actualException, CheckpointException.class);
                Assert.assertTrue((boolean)actualFailureCause.isPresent());
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void testStopWithFailingSourceInOnePipeline(InfiniteTestSource failingSource, File savepointDir, int expectedMaximumNumberOfRestarts, BiConsumer<JobID, ExecutionException> exceptionAssertion) throws Exception {
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().build());
        failingPipelineLatch = new OneShotLatch();
        succeedingPipelineLatch = new OneShotLatch();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)expectedMaximumNumberOfRestarts, (long)0L));
        env.addSource((SourceFunction)failingSource).name("Failing Source").map((MapFunction & Serializable)value -> {
            failingPipelineLatch.trigger();
            return value;
        }).addSink((SinkFunction)new DiscardingSink());
        env.addSource((SourceFunction)new InfiniteTestSource()).name("Succeeding Source").map((MapFunction & Serializable)value -> {
            succeedingPipelineLatch.trigger();
            return value;
        }).addSink((SinkFunction)new DiscardingSink());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        cluster.before();
        try {
            ClusterClient client = cluster.getClusterClient();
            client.submitJob(jobGraph).get();
            failingPipelineLatch.await();
            succeedingPipelineLatch.await();
            try {
                client.stopWithSavepoint(jobGraph.getJobID(), false, savepointDir.getAbsolutePath()).get();
                Assert.fail((String)"The future should fail exceptionally.");
            }
            catch (ExecutionException e) {
                exceptionAssertion.accept(jobGraph.getJobID(), e);
            }
            SavepointITCase.waitUntilAllTasksAreRunning(cluster.getRestAddres(), jobGraph.getJobID());
        }
        finally {
            cluster.after();
        }
    }

    public static void waitUntilAllTasksAreRunning(URI restAddress, JobID jobId) throws Exception {
        RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration((Configuration)new UnmodifiableConfiguration(new Configuration())), (Executor)TestingUtils.defaultExecutor());
        JobDetailsHeaders detailsHeaders = JobDetailsHeaders.getInstance();
        JobMessageParameters params = detailsHeaders.getUnresolvedMessageParameters();
        params.jobPathParameter.resolve((Object)jobId);
        CommonTestUtils.waitUntilCondition(() -> {
            JobDetailsInfo detailsInfo = (JobDetailsInfo)restClient.sendRequest(restAddress.getHost(), restAddress.getPort(), (MessageHeaders)detailsHeaders, (MessageParameters)params, (RequestBody)EmptyRequestBody.getInstance()).get();
            return SavepointITCase.allVerticesRunning(detailsInfo.getJobVerticesPerState());
        }, (Deadline)Deadline.fromNow((Duration)Duration.ofSeconds(10L)));
    }

    private static boolean allVerticesRunning(Map<ExecutionState, Integer> states) {
        return states.entrySet().stream().allMatch(entry -> {
            if (entry.getKey() == ExecutionState.RUNNING) {
                return (Integer)entry.getValue() > 0;
            }
            return (Integer)entry.getValue() == 0;
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCanRestoreWithModifiedStatelessOperators() throws Exception {
        String savepointPath;
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 2;
        Deadline deadline = Deadline.now().plus(Duration.ofMinutes(5L));
        Configuration config = new Configuration();
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, this.savepointDir.toURI().toString());
        LOG.info("Flink configuration: " + config + ".");
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(numTaskManagers).setNumberSlotsPerTaskManager(numSlotsPerTaskManager).build());
        LOG.info("Shutting down Flink cluster.");
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            StatefulCounter statefulCounter = new StatefulCounter();
            StatefulCounter.resetForTest(parallelism);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(parallelism);
            env.addSource((SourceFunction)new InfiniteTestSource()).shuffle().map((MapFunction & Serializable)value -> 4 * value).shuffle().map((MapFunction)statefulCounter).uid("statefulCounter").shuffle().map((MapFunction & Serializable)value -> 2 * value).addSink((SinkFunction)new DiscardingSink());
            JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
            JobID jobID = (JobID)client.submitJob(originalJobGraph).get();
            Assert.assertTrue((boolean)StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
            savepointPath = (String)client.triggerSavepoint(jobID, null).get();
            LOG.info("Retrieved savepoint: " + savepointPath + ".");
        }
        finally {
            LOG.info("Shutting down Flink cluster.");
            cluster.after();
        }
        cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(numTaskManagers).setNumberSlotsPerTaskManager(numSlotsPerTaskManager).build());
        LOG.info("Restarting Flink cluster.");
        cluster.before();
        client = cluster.getClusterClient();
        try {
            StatefulCounter.resetForTest(parallelism);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(parallelism);
            env.addSource((SourceFunction)new InfiniteTestSource()).shuffle().map((MapFunction)new StatefulCounter()).uid("statefulCounter").shuffle().map((MapFunction & Serializable)value -> value).addSink((SinkFunction)new DiscardingSink());
            JobGraph modifiedJobGraph = env.getStreamGraph().getJobGraph();
            modifiedJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath));
            LOG.info("Resubmitting job " + modifiedJobGraph.getJobID() + " with savepoint path " + savepointPath + " in detached mode.");
            client.submitJob(modifiedJobGraph).get();
            Assert.assertTrue((boolean)StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
            Assert.assertTrue((boolean)StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
        }
        finally {
            cluster.after();
        }
    }

    private JobGraph createJobGraph(int parallelism, int numberOfRetries, long restartDelay) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        env.disableOperatorChaining();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)numberOfRetries, (long)restartDelay));
        SingleOutputStreamOperator stream = env.addSource((SourceFunction)new InfiniteTestSource()).shuffle().map((MapFunction)new StatefulCounter());
        stream.addSink((SinkFunction)new DiscardingSink());
        return env.getStreamGraph().getJobGraph();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Test
    public void testSavepointForJobWithIteration() throws Exception {
        MiniClusterWithClientResource cluster;
        block8: {
            for (int i = 0; i < 1; ++i) {
                SavepointITCase.iterTestSnapshotWait[i] = new OneShotLatch();
                SavepointITCase.iterTestRestoreWait[i] = new OneShotLatch();
                SavepointITCase.iterTestCheckpointVerify[i] = 0;
            }
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            IntegerStreamSource source = new IntegerStreamSource();
            IterativeStream iteration = env.addSource((SourceFunction)source).flatMap((FlatMapFunction)new RichFlatMapFunction<Integer, Integer>(){
                private static final long serialVersionUID = 1L;

                public void flatMap(Integer in, Collector<Integer> clctr) throws Exception {
                    clctr.collect((Object)in);
                }
            }).setParallelism(1).keyBy((KeySelector)new KeySelector<Integer, Object>(){
                private static final long serialVersionUID = 1L;

                public Object getKey(Integer value) throws Exception {
                    return value;
                }
            }).flatMap((FlatMapFunction)new DuplicateFilter()).setParallelism(1).iterate();
            SingleOutputStreamOperator iterationBody = iteration.map((MapFunction)new MapFunction<Integer, Integer>(){
                private static final long serialVersionUID = 1L;

                public Integer map(Integer value) throws Exception {
                    return value;
                }
            }).setParallelism(1);
            iteration.closeWith((DataStream)iterationBody);
            StreamGraph streamGraph = env.getStreamGraph("Test");
            JobGraph jobGraph = streamGraph.getJobGraph();
            Configuration config = this.getFileBasedCheckpointsConfig();
            config.addAll(jobGraph.getJobConfiguration());
            config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.ZERO);
            cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(2 * jobGraph.getMaximumParallelism()).build());
            cluster.before();
            ClusterClient client = cluster.getClusterClient();
            String savepointPath = null;
            try {
                client.submitJob(jobGraph).get();
                for (OneShotLatch latch : iterTestSnapshotWait) {
                    latch.await();
                }
                savepointPath = (String)client.triggerSavepoint(jobGraph.getJobID(), null).get();
                client.cancel(jobGraph.getJobID()).get();
                while (!((JobStatus)client.getJobStatus(jobGraph.getJobID()).get()).isGloballyTerminalState()) {
                    Thread.sleep(100L);
                }
                jobGraph = streamGraph.getJobGraph();
                jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath));
                client.submitJob(jobGraph).get();
                for (OneShotLatch latch : iterTestRestoreWait) {
                    latch.await();
                }
                client.cancel(jobGraph.getJobID()).get();
                while (!((JobStatus)client.getJobStatus(jobGraph.getJobID()).get()).isGloballyTerminalState()) {
                    Thread.sleep(100L);
                }
                if (null == savepointPath) break block8;
            }
            catch (Throwable throwable) {
                if (null != savepointPath) {
                    client.disposeSavepoint(savepointPath);
                }
                cluster.after();
                throw throwable;
            }
            client.disposeSavepoint(savepointPath);
        }
        cluster.after();
    }

    private Configuration getFileBasedCheckpointsConfig(String savepointDir) {
        Configuration config = new Configuration();
        config.setString(StateBackendOptions.STATE_BACKEND, "filesystem");
        config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, this.checkpointDir.toURI().toString());
        config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)MemorySize.ZERO);
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
        return config;
    }

    private Configuration getFileBasedCheckpointsConfig() {
        return this.getFileBasedCheckpointsConfig(this.savepointDir.toURI().toString());
    }

    private static Matcher<File> hasEntropyInFileStateHandlePaths() {
        return new TypeSafeDiagnosingMatcher<File>(){

            protected boolean matchesSafely(File savepointDir, Description mismatchDescription) {
                if (savepointDir == null) {
                    mismatchDescription.appendText("savepoint dir must not be null");
                    return false;
                }
                List filesWithoutEntropy = SavepointITCase.listRecursively(savepointDir.toPath().resolve("_entropy_"));
                java.nio.file.Path savepointDirWithEntropy = savepointDir.toPath().resolve("_resolved_");
                List filesWithEntropy = SavepointITCase.listRecursively(savepointDirWithEntropy);
                if (!filesWithoutEntropy.isEmpty()) {
                    mismatchDescription.appendText("there are savepoint files with unresolved entropy placeholders");
                    return false;
                }
                if (!Files.exists(savepointDirWithEntropy, new LinkOption[0]) || filesWithEntropy.isEmpty()) {
                    mismatchDescription.appendText("there are no savepoint files with added entropy");
                    return false;
                }
                return true;
            }

            public void describeTo(Description description) {
                description.appendText("all savepoint files should have added entropy");
            }
        };
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static List<java.nio.file.Path> listRecursively(java.nio.file.Path dir) {
        try {
            if (!Files.exists(dir, new LinkOption[0])) {
                return Collections.emptyList();
            }
            try (Stream<java.nio.file.Path> files = Files.walk(dir, FileVisitOption.FOLLOW_LINKS);){
                List<java.nio.file.Path> list = files.filter(x$0 -> Files.isRegularFile(x$0, new LinkOption[0])).collect(Collectors.toList());
                return list;
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    static {
        iterTestSnapshotWait = new OneShotLatch[1];
        iterTestRestoreWait = new OneShotLatch[1];
        iterTestCheckpointVerify = new int[1];
    }

    private static class MiniClusterResourceFactory {
        private final int numTaskManagers;
        private final int numSlotsPerTaskManager;
        private final Configuration config;

        private MiniClusterResourceFactory(int numTaskManagers, int numSlotsPerTaskManager, Configuration config) {
            this.numTaskManagers = numTaskManagers;
            this.numSlotsPerTaskManager = numSlotsPerTaskManager;
            this.config = config;
        }

        MiniClusterWithClientResource get() {
            return new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(this.config).setNumberTaskManagers(this.numTaskManagers).setNumberSlotsPerTaskManager(this.numSlotsPerTaskManager).build());
        }
    }

    private static class DuplicateFilter
    extends RichFlatMapFunction<Integer, Integer> {
        static final ValueStateDescriptor<Boolean> DESCRIPTOR = new ValueStateDescriptor("seen", Boolean.class, (Object)false);
        private static final long serialVersionUID = 1L;
        private ValueState<Boolean> operatorState;

        private DuplicateFilter() {
        }

        public void open(Configuration configuration) {
            this.operatorState = this.getRuntimeContext().getState(DESCRIPTOR);
        }

        public void flatMap(Integer value, Collector<Integer> out) throws Exception {
            if (!((Boolean)this.operatorState.value()).booleanValue()) {
                out.collect((Object)value);
                this.operatorState.update((Object)true);
            }
            if (30 == value) {
                iterTestSnapshotWait[this.getRuntimeContext().getIndexOfThisSubtask()].trigger();
            }
        }
    }

    private static final class IntegerStreamSource
    extends RichSourceFunction<Integer>
    implements ListCheckpointed<Integer> {
        private static final long serialVersionUID = 1L;
        private volatile boolean running = true;
        private volatile boolean isRestored = false;
        private int emittedCount = 0;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)this.emittedCount);
                }
                this.emittedCount = this.emittedCount < 100 ? ++this.emittedCount : 0;
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            iterTestCheckpointVerify[this.getRuntimeContext().getIndexOfThisSubtask()] = this.emittedCount;
            return Collections.singletonList(this.emittedCount);
        }

        public void restoreState(List<Integer> state) throws Exception {
            if (!state.isEmpty()) {
                this.emittedCount = state.get(0);
            }
            Assert.assertEquals((long)iterTestCheckpointVerify[this.getRuntimeContext().getIndexOfThisSubtask()], (long)this.emittedCount);
            iterTestRestoreWait[this.getRuntimeContext().getIndexOfThisSubtask()].trigger();
        }
    }

    private static class StatefulCounter
    extends RichMapFunction<Integer, Integer>
    implements ListCheckpointed<byte[]> {
        private static volatile CountDownLatch progressLatch = new CountDownLatch(0);
        private static volatile CountDownLatch restoreLatch = new CountDownLatch(0);
        private int numCollectedElements = 0;
        private static final long serialVersionUID = 7317800376639115920L;
        private byte[] data;

        private StatefulCounter() {
        }

        public void open(Configuration parameters) throws Exception {
            if (this.data == null) {
                Random rand = new Random(this.getRuntimeContext().getIndexOfThisSubtask());
                this.data = new byte[(int)((MemorySize)CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue()).getBytes() + 1];
                rand.nextBytes(this.data);
            }
        }

        public Integer map(Integer value) throws Exception {
            int i = 0;
            while (i < this.data.length) {
                int n = i++;
                this.data[n] = (byte)(this.data[n] + 1);
            }
            if (this.numCollectedElements++ > 10) {
                progressLatch.countDown();
            }
            return value;
        }

        public List<byte[]> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.data);
        }

        public void restoreState(List<byte[]> state) throws Exception {
            if (state.isEmpty() || state.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
            }
            this.data = state.get(0);
            restoreLatch.countDown();
        }

        static CountDownLatch getProgressLatch() {
            return progressLatch;
        }

        static CountDownLatch getRestoreLatch() {
            return restoreLatch;
        }

        static void resetForTest(int parallelism) {
            progressLatch = new CountDownLatch(parallelism);
            restoreLatch = new CountDownLatch(parallelism);
        }
    }

    private static class SnapshotFailingInfiniteTestSource
    extends InfiniteTestSource
    implements CheckpointedFunction {
        private SnapshotFailingInfiniteTestSource() {
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            throw new Exception("Expected Exception happened during snapshot creation within test source");
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
        }
    }

    private static class CancelFailingInfiniteTestSource
    extends InfiniteTestSource {
        private static volatile boolean cancelTriggered = false;

        private CancelFailingInfiniteTestSource() {
        }

        @Override
        public void cancel() {
            if (!cancelTriggered) {
                cancelTriggered = true;
                throw new RuntimeException("Expected RuntimeException after snapshot creation.");
            }
            super.cancel();
        }
    }

    private static class InfiniteTestSource
    implements SourceFunction<Integer> {
        private static final long serialVersionUID = 1L;
        private volatile boolean running = true;
        private volatile boolean suspended = false;
        private static final Collection<InfiniteTestSource> createdSources = new CopyOnWriteArrayList<InfiniteTestSource>();
        private volatile transient CompletableFuture<Void> completeFuture;

        private InfiniteTestSource() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            this.completeFuture = new CompletableFuture();
            createdSources.add(this);
            try {
                while (this.running) {
                    if (!this.suspended) {
                        Object object = ctx.getCheckpointLock();
                        synchronized (object) {
                            ctx.collect((Object)1);
                        }
                    }
                    Thread.sleep(1L);
                }
                this.completeFuture.complete(null);
            }
            catch (Exception e) {
                this.completeFuture.completeExceptionally(e);
                throw e;
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void suspend() {
            this.suspended = true;
        }

        public static void resetForTest() {
            createdSources.clear();
        }

        public CompletableFuture<Void> getCompleteFuture() {
            return this.completeFuture;
        }

        public static void cancelAllAndAwait() throws ExecutionException, InterruptedException {
            createdSources.forEach(InfiniteTestSource::cancel);
            CompletableFuture.allOf((CompletableFuture[])createdSources.stream().map(InfiniteTestSource::getCompleteFuture).toArray(CompletableFuture[]::new)).get();
        }

        public static void suspendAll() {
            createdSources.forEach(InfiniteTestSource::suspend);
        }
    }

    static class BoundedPassThroughOperator<T>
    extends AbstractStreamOperator<T>
    implements OneInputStreamOperator<T, T>,
    BoundedOneInput {
        static volatile CountDownLatch progressLatch;
        static volatile CountDownLatch snapshotAllowedLatch;
        static volatile CountDownLatch snapshotStartedLatch;
        static volatile boolean inputEnded;
        private transient boolean processed;

        BoundedPassThroughOperator(ChainingStrategy chainingStrategy) {
            this.chainingStrategy = chainingStrategy;
        }

        private static void allowSnapshots() {
            snapshotAllowedLatch.countDown();
        }

        public static void awaitSnapshotStarted() throws InterruptedException {
            snapshotStartedLatch.await();
        }

        public void endInput() throws Exception {
            inputEnded = true;
        }

        public void processElement(StreamRecord<T> element) throws Exception {
            this.output.collect(element);
            if (!this.processed) {
                this.processed = true;
                progressLatch.countDown();
            }
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            snapshotStartedLatch.countDown();
            snapshotAllowedLatch.await();
            super.snapshotState(context);
        }

        static CountDownLatch getProgressLatch() {
            return progressLatch;
        }

        static void resetForTest(int parallelism, boolean allowSnapshots) {
            progressLatch = new CountDownLatch(parallelism);
            snapshotAllowedLatch = new CountDownLatch(allowSnapshots ? 0 : 1);
            snapshotStartedLatch = new CountDownLatch(parallelism);
            inputEnded = false;
        }
    }
}

