/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.Serializable;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeDiagnosingMatcher;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SavepointITCase
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointITCase.class);
    @Rule
    public final TemporaryFolder folder = new TemporaryFolder();
    private File checkpointDir;
    private File savepointDir;
    private static final int ITER_TEST_PARALLELISM = 1;
    private static OneShotLatch[] iterTestSnapshotWait = new OneShotLatch[1];
    private static OneShotLatch[] iterTestRestoreWait = new OneShotLatch[1];
    private static int[] iterTestCheckpointVerify = new int[1];

    @Before
    public void setUp() throws Exception {
        File testRoot = this.folder.newFolder();
        this.checkpointDir = new File(testRoot, "checkpoints");
        this.savepointDir = new File(testRoot, "savepoints");
        if (!this.checkpointDir.mkdir() || !this.savepointDir.mkdirs()) {
            Assert.fail((String)"Test setup failed: failed to create temporary directories.");
        }
    }

    @Test
    public void testTriggerSavepointAndResumeWithFileBasedCheckpoints() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 4;
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getFileBasedCheckpointsConfig());
        String savepointPath = this.submitJobAndTakeSavepoint(clusterFactory, 4);
        this.verifySavepoint(4, savepointPath);
        this.restoreJobAndVerifyState(savepointPath, clusterFactory, 4);
    }

    @Test
    public void testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 4;
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getFileBasedCheckpointsConfig());
        String savepointPath = this.submitJobAndTakeSavepoint(clusterFactory, 4);
        Path oldPath = new Path(savepointPath);
        Path newPath = new Path(this.folder.newFolder().toURI().toString());
        new Path(savepointPath).getFileSystem().rename(oldPath, newPath);
        this.verifySavepoint(4, newPath.toUri().toString());
        this.restoreJobAndVerifyState(newPath.toUri().toString(), clusterFactory, 4);
    }

    @Test
    public void testShouldAddEntropyToSavepointPath() throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 4;
        MiniClusterResourceFactory clusterFactory = new MiniClusterResourceFactory(2, 2, this.getCheckpointingWithEntropyConfig());
        String savepointPath = this.submitJobAndTakeSavepoint(clusterFactory, 4);
        Assert.assertThat((Object)this.savepointDir, SavepointITCase.hasEntropyInFileStateHandlePaths());
        this.restoreJobAndVerifyState(savepointPath, clusterFactory, 4);
    }

    private Configuration getCheckpointingWithEntropyConfig() {
        String savepointPathWithEntropyPlaceholder = new File(this.savepointDir, "_entropy_").getPath();
        Configuration config = this.getFileBasedCheckpointsConfig("test-entropy://" + savepointPathWithEntropyPlaceholder);
        config.setString("s3.entropy.key", "_entropy_");
        return config;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private String submitJobAndTakeSavepoint(MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
        JobGraph jobGraph = this.createJobGraph(parallelism, 0, 1000L);
        JobID jobId = jobGraph.getJobID();
        StatefulCounter.resetForTest(parallelism);
        MiniClusterWithClientResource cluster = clusterFactory.get();
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            ClientUtils.submitJob((ClusterClient)client, (JobGraph)jobGraph);
            StatefulCounter.getProgressLatch().await();
            String string = (String)client.cancelWithSavepoint(jobId, null).get();
            return string;
        }
        finally {
            cluster.after();
            StatefulCounter.resetForTest(parallelism);
        }
    }

    private void verifySavepoint(int parallelism, String savepointPath) throws URISyntaxException {
        File savepointDir = new File(new URI(savepointPath));
        Assert.assertTrue((String)"Savepoint directory does not exist.", (boolean)savepointDir.exists());
        Assert.assertTrue((String)"Savepoint did not create self-contained directory.", (boolean)savepointDir.isDirectory());
        Object[] savepointFiles = savepointDir.listFiles();
        if (savepointFiles != null) {
            String errMsg = "Did not write expected number of savepoint/checkpoint files to directory: " + Arrays.toString(savepointFiles);
            Assert.assertEquals((String)errMsg, (long)(1 + parallelism), (long)savepointFiles.length);
        } else {
            Assert.fail((String)String.format("Returned savepoint path (%s) is not valid.", savepointPath));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void restoreJobAndVerifyState(String savepointPath, MiniClusterResourceFactory clusterFactory, int parallelism) throws Exception {
        JobGraph jobGraph = this.createJobGraph(parallelism, 0, 1000L);
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath, (boolean)false));
        JobID jobId = jobGraph.getJobID();
        StatefulCounter.resetForTest(parallelism);
        MiniClusterWithClientResource cluster = clusterFactory.get();
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            ClientUtils.submitJob((ClusterClient)client, (JobGraph)jobGraph);
            StatefulCounter.getRestoreLatch().await();
            StatefulCounter.getProgressLatch().await();
            client.cancel(jobId).get();
            FutureUtils.retrySuccessfulWithDelay(() -> client.getJobStatus(jobId), (Time)Time.milliseconds((long)50L), (Deadline)Deadline.now().plus(Duration.ofSeconds(30L)), status -> status == JobStatus.CANCELED, (ScheduledExecutor)TestingUtils.defaultScheduledExecutor());
            client.disposeSavepoint(savepointPath).get();
            Assert.assertFalse((String)"Savepoint not properly cleaned up.", (boolean)new File(savepointPath).exists());
        }
        finally {
            cluster.after();
            StatefulCounter.resetForTest(parallelism);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerSavepointForNonExistingJob() throws Exception {
        boolean numTaskManagers = true;
        boolean numSlotsPerTaskManager = true;
        Configuration config = new Configuration();
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, this.savepointDir.toURI().toString());
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        JobID jobID = new JobID();
        try {
            client.triggerSavepoint(jobID, null).get();
            Assert.fail();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, FlinkJobNotFoundException.class).isPresent());
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)jobID.toString()).isPresent());
        }
        finally {
            cluster.after();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerSavepointWithCheckpointingDisabled() throws Exception {
        boolean numTaskManagers = true;
        boolean numSlotsPerTaskManager = true;
        Configuration config = new Configuration();
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(1).build());
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        JobVertex vertex = new JobVertex("Blocking vertex");
        vertex.setInvokableClass(BlockingNoOpInvokable.class);
        vertex.setParallelism(1);
        JobGraph graph = new JobGraph(new JobVertex[]{vertex});
        try {
            ClientUtils.submitJob((ClusterClient)client, (JobGraph)graph);
            client.triggerSavepoint(graph.getJobID(), null).get();
            Assert.fail();
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, IllegalStateException.class).isPresent());
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)graph.getJobID().toString()).isPresent());
            Assert.assertTrue((boolean)ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"is not a streaming job").isPresent());
        }
        finally {
            cluster.after();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSubmitWithUnknownSavepointPath() throws Exception {
        int numTaskManagers = 1;
        int numSlotsPerTaskManager = 1;
        int parallelism = numTaskManagers * numSlotsPerTaskManager;
        Configuration config = new Configuration();
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, this.savepointDir.toURI().toString());
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(numTaskManagers).setNumberSlotsPerTaskManager(numSlotsPerTaskManager).build());
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            int numberOfRetries = 1000;
            JobGraph jobGraph = this.createJobGraph(parallelism, numberOfRetries, 3600000L);
            jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)"unknown path"));
            Assert.assertEquals((Object)"unknown path", (Object)jobGraph.getSavepointRestoreSettings().getRestorePath());
            LOG.info("Submitting job " + jobGraph.getJobID() + " in detached mode.");
            try {
                ClientUtils.submitJobAndWaitForResult((ClusterClient)client, (JobGraph)jobGraph, (ClassLoader)SavepointITCase.class.getClassLoader());
            }
            catch (Exception e) {
                Optional expectedJobExecutionException = ExceptionUtils.findThrowable((Throwable)e, JobExecutionException.class);
                Optional expectedFileNotFoundException = ExceptionUtils.findThrowable((Throwable)e, FileNotFoundException.class);
                if (!expectedJobExecutionException.isPresent() || !expectedFileNotFoundException.isPresent()) {
                    throw e;
                }
            }
        }
        finally {
            cluster.after();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCanRestoreWithModifiedStatelessOperators() throws Exception {
        String savepointPath;
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        int parallelism = 2;
        Deadline deadline = Deadline.now().plus(Duration.ofMinutes(5L));
        Configuration config = new Configuration();
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, this.savepointDir.toURI().toString());
        LOG.info("Flink configuration: " + config + ".");
        MiniClusterWithClientResource cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(numTaskManagers).setNumberSlotsPerTaskManager(numSlotsPerTaskManager).build());
        LOG.info("Shutting down Flink cluster.");
        cluster.before();
        ClusterClient client = cluster.getClusterClient();
        try {
            StatefulCounter statefulCounter = new StatefulCounter();
            StatefulCounter.resetForTest(parallelism);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(parallelism);
            env.addSource((SourceFunction)new InfiniteTestSource()).shuffle().map((MapFunction & Serializable)value -> 4 * value).shuffle().map((MapFunction)statefulCounter).uid("statefulCounter").shuffle().map((MapFunction & Serializable)value -> 2 * value).addSink((SinkFunction)new DiscardingSink());
            JobGraph originalJobGraph = env.getStreamGraph().getJobGraph();
            JobExecutionResult submissionResult = ClientUtils.submitJob((ClusterClient)client, (JobGraph)originalJobGraph);
            JobID jobID = submissionResult.getJobID();
            Assert.assertTrue((boolean)StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
            savepointPath = (String)client.triggerSavepoint(jobID, null).get();
            LOG.info("Retrieved savepoint: " + savepointPath + ".");
        }
        finally {
            LOG.info("Shutting down Flink cluster.");
            cluster.after();
        }
        cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(numTaskManagers).setNumberSlotsPerTaskManager(numSlotsPerTaskManager).build());
        LOG.info("Restarting Flink cluster.");
        cluster.before();
        client = cluster.getClusterClient();
        try {
            StatefulCounter.resetForTest(parallelism);
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(parallelism);
            env.addSource((SourceFunction)new InfiniteTestSource()).shuffle().map((MapFunction)new StatefulCounter()).uid("statefulCounter").shuffle().map((MapFunction & Serializable)value -> value).addSink((SinkFunction)new DiscardingSink());
            JobGraph modifiedJobGraph = env.getStreamGraph().getJobGraph();
            modifiedJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath));
            LOG.info("Resubmitting job " + modifiedJobGraph.getJobID() + " with savepoint path " + savepointPath + " in detached mode.");
            ClientUtils.submitJob((ClusterClient)client, (JobGraph)modifiedJobGraph);
            Assert.assertTrue((boolean)StatefulCounter.getRestoreLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
            Assert.assertTrue((boolean)StatefulCounter.getProgressLatch().await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
        }
        finally {
            cluster.after();
        }
    }

    private JobGraph createJobGraph(int parallelism, int numberOfRetries, long restartDelay) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        env.disableOperatorChaining();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart((int)numberOfRetries, (long)restartDelay));
        SingleOutputStreamOperator stream = env.addSource((SourceFunction)new InfiniteTestSource()).shuffle().map((MapFunction)new StatefulCounter());
        stream.addSink((SinkFunction)new DiscardingSink());
        return env.getStreamGraph().getJobGraph();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    @Test
    public void testSavepointForJobWithIteration() throws Exception {
        MiniClusterWithClientResource cluster;
        block8: {
            for (int i = 0; i < 1; ++i) {
                SavepointITCase.iterTestSnapshotWait[i] = new OneShotLatch();
                SavepointITCase.iterTestRestoreWait[i] = new OneShotLatch();
                SavepointITCase.iterTestCheckpointVerify[i] = 0;
            }
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            IntegerStreamSource source = new IntegerStreamSource();
            IterativeStream iteration = env.addSource((SourceFunction)source).flatMap((FlatMapFunction)new RichFlatMapFunction<Integer, Integer>(){
                private static final long serialVersionUID = 1L;

                public void flatMap(Integer in, Collector<Integer> clctr) throws Exception {
                    clctr.collect((Object)in);
                }
            }).setParallelism(1).keyBy((KeySelector)new KeySelector<Integer, Object>(){
                private static final long serialVersionUID = 1L;

                public Object getKey(Integer value) throws Exception {
                    return value;
                }
            }).flatMap((FlatMapFunction)new DuplicateFilter()).setParallelism(1).iterate();
            SingleOutputStreamOperator iterationBody = iteration.map((MapFunction)new MapFunction<Integer, Integer>(){
                private static final long serialVersionUID = 1L;

                public Integer map(Integer value) throws Exception {
                    return value;
                }
            }).setParallelism(1);
            iteration.closeWith((DataStream)iterationBody);
            StreamGraph streamGraph = env.getStreamGraph("Test");
            JobGraph jobGraph = streamGraph.getJobGraph();
            Configuration config = this.getFileBasedCheckpointsConfig();
            config.addAll(jobGraph.getJobConfiguration());
            config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.ZERO);
            cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(2 * jobGraph.getMaximumParallelism()).build());
            cluster.before();
            ClusterClient client = cluster.getClusterClient();
            String savepointPath = null;
            try {
                ClientUtils.submitJob((ClusterClient)client, (JobGraph)jobGraph);
                for (OneShotLatch latch : iterTestSnapshotWait) {
                    latch.await();
                }
                savepointPath = (String)client.triggerSavepoint(jobGraph.getJobID(), null).get();
                client.cancel(jobGraph.getJobID()).get();
                while (!((JobStatus)client.getJobStatus(jobGraph.getJobID()).get()).isGloballyTerminalState()) {
                    Thread.sleep(100L);
                }
                jobGraph = streamGraph.getJobGraph();
                jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath));
                ClientUtils.submitJob((ClusterClient)client, (JobGraph)jobGraph);
                for (OneShotLatch latch : iterTestRestoreWait) {
                    latch.await();
                }
                client.cancel(jobGraph.getJobID()).get();
                while (!((JobStatus)client.getJobStatus(jobGraph.getJobID()).get()).isGloballyTerminalState()) {
                    Thread.sleep(100L);
                }
                if (null == savepointPath) break block8;
            }
            catch (Throwable throwable) {
                if (null != savepointPath) {
                    client.disposeSavepoint(savepointPath);
                }
                cluster.after();
                throw throwable;
            }
            client.disposeSavepoint(savepointPath);
        }
        cluster.after();
    }

    private Configuration getFileBasedCheckpointsConfig(String savepointDir) {
        Configuration config = new Configuration();
        config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
        config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, this.checkpointDir.toURI().toString());
        config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)MemorySize.ZERO);
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
        return config;
    }

    private Configuration getFileBasedCheckpointsConfig() {
        return this.getFileBasedCheckpointsConfig(this.savepointDir.toURI().toString());
    }

    private static Matcher<File> hasEntropyInFileStateHandlePaths() {
        return new TypeSafeDiagnosingMatcher<File>(){

            protected boolean matchesSafely(File savepointDir, Description mismatchDescription) {
                if (savepointDir == null) {
                    mismatchDescription.appendText("savepoint dir must not be null");
                    return false;
                }
                List filesWithoutEntropy = SavepointITCase.listRecursively(savepointDir.toPath().resolve("_entropy_"));
                java.nio.file.Path savepointDirWithEntropy = savepointDir.toPath().resolve("_resolved_");
                List filesWithEntropy = SavepointITCase.listRecursively(savepointDirWithEntropy);
                if (!filesWithoutEntropy.isEmpty()) {
                    mismatchDescription.appendText("there are savepoint files with unresolved entropy placeholders");
                    return false;
                }
                if (!Files.exists(savepointDirWithEntropy, new LinkOption[0]) || filesWithEntropy.isEmpty()) {
                    mismatchDescription.appendText("there are no savepoint files with added entropy");
                    return false;
                }
                return true;
            }

            public void describeTo(Description description) {
                description.appendText("all savepoint files should have added entropy");
            }
        };
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static List<java.nio.file.Path> listRecursively(java.nio.file.Path dir) {
        try {
            if (!Files.exists(dir, new LinkOption[0])) {
                return Collections.emptyList();
            }
            try (Stream<java.nio.file.Path> files = Files.walk(dir, FileVisitOption.FOLLOW_LINKS);){
                List<java.nio.file.Path> list = files.filter(x$0 -> Files.isRegularFile(x$0, new LinkOption[0])).collect(Collectors.toList());
                return list;
            }
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static class MiniClusterResourceFactory {
        private final int numTaskManagers;
        private final int numSlotsPerTaskManager;
        private final Configuration config;

        private MiniClusterResourceFactory(int numTaskManagers, int numSlotsPerTaskManager, Configuration config) {
            this.numTaskManagers = numTaskManagers;
            this.numSlotsPerTaskManager = numSlotsPerTaskManager;
            this.config = config;
        }

        MiniClusterWithClientResource get() {
            return new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(this.config).setNumberTaskManagers(this.numTaskManagers).setNumberSlotsPerTaskManager(this.numSlotsPerTaskManager).build());
        }
    }

    private static class DuplicateFilter
    extends RichFlatMapFunction<Integer, Integer> {
        static final ValueStateDescriptor<Boolean> DESCRIPTOR = new ValueStateDescriptor("seen", Boolean.class, (Object)false);
        private static final long serialVersionUID = 1L;
        private ValueState<Boolean> operatorState;

        private DuplicateFilter() {
        }

        public void open(Configuration configuration) {
            this.operatorState = this.getRuntimeContext().getState(DESCRIPTOR);
        }

        public void flatMap(Integer value, Collector<Integer> out) throws Exception {
            if (!((Boolean)this.operatorState.value()).booleanValue()) {
                out.collect((Object)value);
                this.operatorState.update((Object)true);
            }
            if (30 == value) {
                iterTestSnapshotWait[this.getRuntimeContext().getIndexOfThisSubtask()].trigger();
            }
        }
    }

    private static final class IntegerStreamSource
    extends RichSourceFunction<Integer>
    implements ListCheckpointed<Integer> {
        private static final long serialVersionUID = 1L;
        private volatile boolean running = true;
        private volatile boolean isRestored = false;
        private int emittedCount = 0;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)this.emittedCount);
                }
                this.emittedCount = this.emittedCount < 100 ? ++this.emittedCount : 0;
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }

        public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
            iterTestCheckpointVerify[this.getRuntimeContext().getIndexOfThisSubtask()] = this.emittedCount;
            return Collections.singletonList(this.emittedCount);
        }

        public void restoreState(List<Integer> state) throws Exception {
            if (!state.isEmpty()) {
                this.emittedCount = state.get(0);
            }
            Assert.assertEquals((long)iterTestCheckpointVerify[this.getRuntimeContext().getIndexOfThisSubtask()], (long)this.emittedCount);
            iterTestRestoreWait[this.getRuntimeContext().getIndexOfThisSubtask()].trigger();
        }
    }

    private static class StatefulCounter
    extends RichMapFunction<Integer, Integer>
    implements ListCheckpointed<byte[]> {
        private static volatile CountDownLatch progressLatch = new CountDownLatch(0);
        private static volatile CountDownLatch restoreLatch = new CountDownLatch(0);
        private int numCollectedElements = 0;
        private static final long serialVersionUID = 7317800376639115920L;
        private byte[] data;

        private StatefulCounter() {
        }

        public void open(Configuration parameters) throws Exception {
            if (this.data == null) {
                Random rand = new Random(this.getRuntimeContext().getIndexOfThisSubtask());
                this.data = new byte[(int)((MemorySize)CheckpointingOptions.FS_SMALL_FILE_THRESHOLD.defaultValue()).getBytes() + 1];
                rand.nextBytes(this.data);
            }
        }

        public Integer map(Integer value) throws Exception {
            int i = 0;
            while (i < this.data.length) {
                int n = i++;
                this.data[n] = (byte)(this.data[n] + 1);
            }
            if (this.numCollectedElements++ > 10) {
                progressLatch.countDown();
            }
            return value;
        }

        public List<byte[]> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(this.data);
        }

        public void restoreState(List<byte[]> state) throws Exception {
            if (state.isEmpty() || state.size() > 1) {
                throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
            }
            this.data = state.get(0);
            restoreLatch.countDown();
        }

        static CountDownLatch getProgressLatch() {
            return progressLatch;
        }

        static CountDownLatch getRestoreLatch() {
            return restoreLatch;
        }

        static void resetForTest(int parallelism) {
            progressLatch = new CountDownLatch(parallelism);
            restoreLatch = new CountDownLatch(parallelism);
        }
    }

    private static class InfiniteTestSource
    implements SourceFunction<Integer> {
        private static final long serialVersionUID = 1L;
        private volatile boolean running = true;

        private InfiniteTestSource() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)1);
                }
                Thread.sleep(1L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }
}

