/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.File;
import java.io.IOException;
import java.lang.invoke.CallSite;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.execution.RecoveryClaimMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.testutils.ManuallyTriggeredScheduledExecutorService;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureManager;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointProperties;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.DefaultCheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.EmbeddedCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
import org.apache.flink.runtime.checkpoint.RoundRobinOperatorStateRepartitioner;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.StateAssignmentOperation;
import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.checkpoint.TestingCompletedCheckpointStore;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.DiscardRecordedStateObject;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.PhysicalStateHandleID;
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestingStreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAccess;
import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.util.function.TriFunction;
import org.apache.flink.util.function.TriFunctionWithException;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.CompletableFutureAssert;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

class CheckpointCoordinatorTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_RESOURCE = TestingUtils.defaultExecutorExtension();
    private static final String TASK_MANAGER_LOCATION_INFO = "Unknown location";
    private ManuallyTriggeredScheduledExecutor manuallyTriggeredScheduledExecutor;
    @TempDir
    private java.nio.file.Path tmpFolder;

    CheckpointCoordinatorTest() {
    }

    @Test
    void testSharedStateNotDiscaredOnAbort() throws Exception {
        JobVertexID v1 = new JobVertexID();
        JobVertexID v2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(v1).addJobVertex(v2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        coordinator.startCheckpointScheduler();
        CompletableFuture cpFuture = coordinator.triggerCheckpoint(true);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        cpFuture.getNow(null);
        TestingStreamStateHandle metaState = this.handle();
        TestingStreamStateHandle privateState = this.handle();
        TestingStreamStateHandle sharedState = this.handle();
        this.ackCheckpoint(1L, coordinator, v1, graph, metaState, privateState, sharedState);
        this.declineCheckpoint(1L, coordinator, v2, graph);
        Assertions.assertThat((boolean)privateState.isDisposed()).isTrue();
        Assertions.assertThat((boolean)metaState.isDisposed()).isTrue();
        Assertions.assertThat((boolean)sharedState.isDisposed()).isFalse();
        cpFuture = coordinator.triggerCheckpoint(true);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        cpFuture.getNow(null);
        this.ackCheckpoint(2L, coordinator, v1, graph, this.handle(), this.handle(), this.handle());
        this.ackCheckpoint(2L, coordinator, v2, graph, this.handle(), this.handle(), this.handle());
        cpFuture.get();
        Assertions.assertThat((boolean)sharedState.isDisposed()).isTrue();
    }

    @Test
    void testAbortedCheckpointStatsUpdatedAfterFailure() throws Exception {
        this.testReportStatsAfterFailure(1L, (coordinator, execution, metrics) -> {
            coordinator.reportCheckpointMetrics(1L, execution.getAttemptId(), metrics);
            return null;
        });
    }

    @Test
    void testCheckpointStatsUpdatedAfterFailure() throws Exception {
        this.testReportStatsAfterFailure(1L, (coordinator, execution, metrics) -> coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(execution.getVertex().getJobId(), execution.getAttemptId(), 1L, metrics, new TaskStateSnapshot()), TASK_MANAGER_LOCATION_INFO));
    }

    private void testReportStatsAfterFailure(long checkpointId, TriFunctionWithException<CheckpointCoordinator, Execution, CheckpointMetrics, ?, CheckpointException> reportFn) throws Exception {
        JobVertexID decliningVertexID = new JobVertexID();
        JobVertexID lateReportVertexID = new JobVertexID();
        ExecutionGraph executionGraph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(decliningVertexID).addJobVertex(lateReportVertexID).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex decliningVertex = executionGraph.getJobVertex(decliningVertexID).getTaskVertices()[0];
        ExecutionVertex lateReportVertex = executionGraph.getJobVertex(lateReportVertexID).getTaskVertices()[0];
        DefaultCheckpointStatsTracker statsTracker = new DefaultCheckpointStatsTracker(Integer.MAX_VALUE, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker((CheckpointStatsTracker)statsTracker).build(executionGraph);
        CompletableFuture result = coordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Preconditions.checkState((coordinator.getNumberOfPendingCheckpoints() == 1 ? 1 : 0) != 0, (String)"wrong number of pending checkpoints: %s", (Object[])new Object[]{coordinator.getNumberOfPendingCheckpoints()});
        if (result.isDone()) {
            result.get();
        }
        coordinator.receiveDeclineMessage(new DeclineCheckpoint(executionGraph.getJobID(), decliningVertex.getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), "test");
        CheckpointMetrics lateReportedMetrics = new CheckpointMetricsBuilder().setTotalBytesPersisted(18L).setBytesPersistedOfThisCheckpoint(18L).setBytesProcessedDuringAlignment(19L).setAsyncDurationMillis(20L).setAlignmentDurationNanos(123000000L).setCheckpointStartDelayNanos(567000000L).build();
        reportFn.apply((Object)coordinator, (Object)lateReportVertex.getCurrentExecutionAttempt(), (Object)lateReportedMetrics);
        this.assertStatsEqual(checkpointId, lateReportVertex.getJobvertexId(), 0, lateReportedMetrics, statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId));
    }

    private boolean hasNoSubState(OperatorState s) {
        return s.getNumberCollectedStates() == 0;
    }

    private void assertStatsEqual(long checkpointId, JobVertexID jobVertexID, int subtasIdx, CheckpointMetrics expected, AbstractCheckpointStats actual) {
        Assertions.assertThat((long)actual.getCheckpointId()).isEqualTo(checkpointId);
        Assertions.assertThat((Comparable)actual.getStatus()).isEqualTo((Object)CheckpointStatsStatus.FAILED);
        Assertions.assertThat((int)actual.getNumberOfAcknowledgedSubtasks()).isZero();
        CheckpointCoordinatorTest.assertStatsMetrics(jobVertexID, subtasIdx, expected, actual);
    }

    public static void assertStatsMetrics(JobVertexID jobVertexID, int subtasIdx, CheckpointMetrics expected, AbstractCheckpointStats actual) {
        Assertions.assertThat((long)actual.getStateSize()).isEqualTo(expected.getTotalBytesPersisted());
        SubtaskStateStats taskStats = actual.getAllTaskStateStats().stream().filter(s -> s.getJobVertexId().equals((Object)jobVertexID)).findAny().get().getSubtaskStats()[subtasIdx];
        Assertions.assertThat((long)taskStats.getAlignmentDuration()).isEqualTo(expected.getAlignmentDurationNanos() / 1000000L);
        Assertions.assertThat((boolean)taskStats.getUnalignedCheckpoint()).isEqualTo(expected.getUnalignedCheckpoint());
        Assertions.assertThat((long)taskStats.getAsyncCheckpointDuration()).isEqualTo(expected.getAsyncDurationMillis());
        Assertions.assertThat((long)taskStats.getAlignmentDuration()).isEqualTo(expected.getAlignmentDurationNanos() / 1000000L);
        Assertions.assertThat((long)taskStats.getCheckpointStartDelay()).isEqualTo(expected.getCheckpointStartDelayNanos() / 1000000L);
    }

    @BeforeEach
    void setUp() {
        this.manuallyTriggeredScheduledExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    void testScheduleTriggerRequestDuringShutdown() throws Exception {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        CheckpointCoordinator coordinator = this.getCheckpointCoordinator((ScheduledExecutor)new ScheduledExecutorServiceAdapter(executor));
        coordinator.shutdown();
        executor.shutdownNow();
        coordinator.scheduleTriggerRequest();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testMinCheckpointPause() throws Exception {
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        CheckpointCoordinator coordinator = null;
        try {
            int pause = 1000;
            JobVertexID jobVertexId = new JobVertexID();
            ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexId).setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(new DirectScheduledExecutorService())).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
            ExecutionVertex vertex = graph.getJobVertex(jobVertexId).getTaskVertices()[0];
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)new ScheduledExecutorServiceAdapter(executorService)).setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval((long)pause).setCheckpointTimeout(Long.MAX_VALUE).setMaxConcurrentCheckpoints(1).setMinPauseBetweenCheckpoints((long)pause).build()).build(graph);
            coordinator.startCheckpointScheduler();
            coordinator.triggerCheckpoint(true);
            coordinator.triggerCheckpoint(true);
            while (coordinator.getPendingCheckpoints().values().stream().noneMatch(pc -> pc.getCheckpointStorageLocation() != null)) {
                Thread.sleep(10L);
            }
            coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptId, 1L), TASK_MANAGER_LOCATION_INFO);
            Thread.sleep(pause / 2);
            Assertions.assertThat((int)coordinator.getNumberOfPendingCheckpoints()).isZero();
            while (coordinator.getNumberOfPendingCheckpoints() == 0) {
                Thread.sleep(1L);
            }
        }
        finally {
            if (coordinator != null) {
                coordinator.shutdown();
            }
            executorService.shutdownNow();
        }
    }

    @Test
    void testCheckpointAbortsIfTriggerTasksAreNotExecuted() throws Exception {
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).addJobVertex(new JobVertexID(), false).setTransitToRunning(false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((CompletableFuture)checkpointFuture).isCompletedExceptionally();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        checkpointCoordinator.shutdown();
    }

    @Test
    void testCheckpointAbortsIfTriggerTasksAreFinished() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2, false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph);
        Arrays.stream(graph.getJobVertex(jobVertexID1).getTaskVertices()).forEach(task -> task.getCurrentExecutionAttempt().markFinished());
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((CompletableFuture)checkpointFuture).isCompletedExceptionally();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        checkpointCoordinator.shutdown();
    }

    @Test
    void testCheckpointTriggeredAfterSomeTasksFinishedIfAllowed() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1, 3, 256).addJobVertex(jobVertexID2, 3, 256).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
        ExecutionJobVertex jobVertex2 = graph.getJobVertex(jobVertexID2);
        jobVertex1.getTaskVertices()[0].getCurrentExecutionAttempt().markFinished();
        jobVertex1.getTaskVertices()[1].getCurrentExecutionAttempt().markFinished();
        jobVertex2.getTaskVertices()[1].getCurrentExecutionAttempt().markFinished();
        DefaultCheckpointStatsTracker statsTracker = new DefaultCheckpointStatsTracker(Integer.MAX_VALUE, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setAllowCheckpointsAfterTasksFinished(true).setCheckpointStatsTracker((CheckpointStatsTracker)statsTracker).build(graph);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((boolean)checkpointFuture.isDone()).isFalse();
        Assertions.assertThat((boolean)checkpointFuture.isCompletedExceptionally()).isFalse();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        AbstractCheckpointStats checkpointStats = statsTracker.createSnapshot().getHistory().getCheckpointById(pendingCheckpoint.getCheckpointID());
        Assertions.assertThat((int)checkpointStats.getNumberOfAcknowledgedSubtasks()).isEqualTo(3);
        for (ExecutionVertex task : Arrays.asList(jobVertex1.getTaskVertices()[0], jobVertex1.getTaskVertices()[1], jobVertex2.getTaskVertices()[1])) {
            Assertions.assertThat((Object)checkpointStats.getTaskStateStats(task.getJobvertexId()).getSubtaskStats()[task.getParallelSubtaskIndex()]).isNotNull();
        }
    }

    @Test
    void testTasksFinishDuringTriggering() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().setTransitToRunning(false).addJobVertex(jobVertexID1, 1, 256).addJobVertex(jobVertexID2, 1, 256).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
        final ExecutionVertex taskVertex = jobVertex1.getTaskVertices()[0];
        ExecutionJobVertex jobVertex2 = graph.getJobVertex(jobVertexID2);
        ExecutionVertex taskVertex2 = jobVertex2.getTaskVertices()[0];
        final AtomicBoolean checkpointAborted = new AtomicBoolean(false);
        TestingLogicalSlot slot1 = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SimpleAckingTaskManagerGateway(){

            @Override
            public CompletableFuture<Acknowledge> triggerCheckpoint(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
                taskVertex.getCurrentExecutionAttempt().markFinished();
                return FutureUtils.completedExceptionally((Throwable)new RpcException(""));
            }
        }).createTestingLogicalSlot();
        TestingLogicalSlot slot2 = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SimpleAckingTaskManagerGateway(){

            @Override
            public void notifyCheckpointAborted(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long latestCompletedCheckpointId, long timestamp) {
                checkpointAborted.set(true);
            }
        }).createTestingLogicalSlot();
        ExecutionGraphTestUtils.setVertexResource(taskVertex, slot1);
        taskVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
        ExecutionGraphTestUtils.setVertexResource(taskVertex2, slot2);
        taskVertex2.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setAllowCheckpointsAfterTasksFinished(true).build(graph);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((CompletableFuture)checkpointFuture).isCompletedExceptionally();
        Assertions.assertThat((boolean)checkpointAborted.get()).isTrue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        String errorMsg = "Exceeded checkpoint failure tolerance number!";
        CheckpointFailureManager checkpointFailureManager = this.getCheckpointFailureManager("Exceeded checkpoint failure tolerance number!");
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph, checkpointFailureManager);
        try {
            CompletableFuture checkPointFuture = checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkPointFuture);
            long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
            PendingCheckpoint checkpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
            Assertions.assertThat((boolean)checkpoint.isDisposed()).isFalse();
            Assertions.assertThat((boolean)checkpoint.areTasksFullyAcknowledged()).isFalse();
            checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
            Assertions.fail((String)"Test failed.");
        }
        catch (Exception e) {
            ExceptionUtils.assertThrowableWithMessage((Throwable)e, (String)"Exceeded checkpoint failure tolerance number!");
        }
        finally {
            checkpointCoordinator.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testIOExceptionCheckpointExceedsTolerableFailureNumber() throws Exception {
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).addJobVertex(new JobVertexID()).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        String expectedErrorMessage = "Expected Error Message";
        CheckpointFailureManager checkpointFailureManager = this.getCheckpointFailureManager("Expected Error Message");
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph, checkpointFailureManager);
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.IO_EXCEPTION));
            Assertions.fail((String)"Test failed.");
        }
        catch (Exception e) {
            ExceptionUtils.assertThrowableWithMessage((Throwable)e, (String)"Expected Error Message");
        }
        finally {
            checkpointCoordinator.shutdown();
        }
    }

    @Test
    void testIOExceptionForPeriodicSchedulingWithInactiveTasks() throws Exception {
        block2: {
            CheckpointCoordinator checkpointCoordinator = this.setupCheckpointCoordinatorWithInactiveTasks((CheckpointStorage)new IOExceptionCheckpointStorage());
            CompletableFuture onCompletionPromise = checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, true);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            try {
                onCompletionPromise.get();
                Assertions.fail((String)"should not trigger periodic checkpoint after IOException occurred.");
            }
            catch (Exception e) {
                Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
                if (checkpointExceptionOptional.isPresent() && ((CheckpointException)checkpointExceptionOptional.get()).getCheckpointFailureReason() == CheckpointFailureReason.IO_EXCEPTION) break block2;
                throw e;
            }
        }
    }

    @Test
    void testTriggerCheckpointAfterCheckpointStorageIOException() throws Exception {
        TestFailJobCallback failureCallback = new TestFailJobCallback();
        DefaultCheckpointStatsTracker statsTracker = new DefaultCheckpointStatsTracker(Integer.MAX_VALUE, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointStatsTracker((CheckpointStatsTracker)statsTracker).setFailureManager(new CheckpointFailureManager(0, (CheckpointFailureManager.FailJobCallback)failureCallback)).setCheckpointStorage((CheckpointStorage)new IOExceptionCheckpointStorage()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        this.testTriggerCheckpoint(checkpointCoordinator, CheckpointFailureReason.IO_EXCEPTION);
        Assertions.assertThat((int)failureCallback.getInvokeCounter()).isOne();
        Assertions.assertThat((Object)statsTracker.getPendingCheckpointStats(1L)).isNotNull();
    }

    @Test
    void testCheckpointAbortsIfTriggerTasksAreFinishedAndIOException() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2, false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointStorage((CheckpointStorage)new IOExceptionCheckpointStorage()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        Arrays.stream(graph.getJobVertex(jobVertexID1).getTaskVertices()).forEach(task -> task.getCurrentExecutionAttempt().markFinished());
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        checkpointCoordinator.startCheckpointScheduler();
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((CompletableFuture)checkpointFuture).isCompletedExceptionally();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        checkpointCoordinator.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testExpiredCheckpointExceedsTolerableFailureNumber() throws Exception {
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).addJobVertex(new JobVertexID()).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        String errorMsg = "Exceeded checkpoint failure tolerance number!";
        CheckpointFailureManager checkpointFailureManager = this.getCheckpointFailureManager("Exceeded checkpoint failure tolerance number!");
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph, checkpointFailureManager);
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED));
            Assertions.fail((String)"Test failed.");
        }
        catch (Exception e) {
            ExceptionUtils.assertThrowableWithMessage((Throwable)e, (String)"Exceeded checkpoint failure tolerance number!");
        }
        finally {
            checkpointCoordinator.shutdown();
        }
    }

    @Test
    void testTriggerAndDeclineSyncCheckpointFailureSimple() throws Exception {
        this.testTriggerAndDeclineCheckpointSimple(CheckpointFailureReason.CHECKPOINT_DECLINED);
    }

    @Test
    void testTriggerAndDeclineAsyncCheckpointFailureSimple() throws Exception {
        this.testTriggerAndDeclineCheckpointSimple(CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION);
    }

    private void testTriggerAndDeclineCheckpointSimple(CheckpointFailureReason checkpointFailureReason) throws Exception {
        CheckpointException checkpointException = new CheckpointException(checkpointFailureReason);
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        TestFailJobCallback failJobCallback = new TestFailJobCallback();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setAlignedCheckpointTimeout(Long.MAX_VALUE).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointFailureManager(new CheckpointFailureManager(0, (CheckpointFailureManager.FailJobCallback)failJobCallback)).build(graph);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        Assertions.assertThat((Collection)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(1);
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        PendingCheckpoint checkpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
        Assertions.assertThat((Object)checkpoint).isNotNull();
        Assertions.assertThat((long)checkpoint.getCheckpointID()).isEqualTo(checkpointId);
        Assertions.assertThat((Comparable)checkpoint.getJobId()).isEqualTo((Object)graph.getJobID());
        Assertions.assertThat((int)checkpoint.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
        Assertions.assertThat((int)checkpoint.getNumberOfAcknowledgedTasks()).isZero();
        Assertions.assertThat((int)checkpoint.getOperatorStates().size()).isZero();
        Assertions.assertThat((boolean)checkpoint.isDisposed()).isFalse();
        Assertions.assertThat((boolean)checkpoint.areTasksFullyAcknowledged()).isFalse();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            CheckpointCoordinatorTestingUtils.TriggeredCheckpoint triggeredCheckpoint = gateway.getOnlyTriggeredCheckpoint(vertex.getCurrentExecutionAttempt().getAttemptId());
            Assertions.assertThat((long)triggeredCheckpoint.checkpointId).isEqualTo(checkpointId);
            Assertions.assertThat((long)triggeredCheckpoint.timestamp).isEqualTo(checkpoint.getCheckpointTimestamp());
            Assertions.assertThat((Object)triggeredCheckpoint.checkpointOptions).isEqualTo((Object)CheckpointOptions.forCheckpointWithDefaultLocation());
        }
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((int)checkpoint.getNumberOfAcknowledgedTasks()).isOne();
        Assertions.assertThat((int)checkpoint.getNumberOfNonAcknowledgedTasks()).isOne();
        Assertions.assertThat((boolean)checkpoint.isDisposed()).isFalse();
        Assertions.assertThat((boolean)checkpoint.areTasksFullyAcknowledged()).isFalse();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId), TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((boolean)checkpoint.isDisposed()).isFalse();
        Assertions.assertThat((boolean)checkpoint.areTasksFullyAcknowledged()).isFalse();
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID1, checkpointId, checkpointException), TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((boolean)checkpoint.isDisposed()).isTrue();
        Assertions.assertThat((int)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID1, checkpointId, checkpointException), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID2, checkpointId, checkpointException), TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((boolean)checkpoint.isDisposed()).isTrue();
        Assertions.assertThat((int)failJobCallback.getInvokeCounter()).isOne();
        checkpointCoordinator.shutdown();
    }

    @Test
    void testTriggerAndDeclineCheckpointComplex() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        Assertions.assertThat((Collection)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
        CompletableFuture checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture1);
        CompletableFuture checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture2);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        Assertions.assertThat((Collection)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(2);
        Iterator it = checkpointCoordinator.getPendingCheckpoints().entrySet().iterator();
        long checkpoint1Id = (Long)it.next().getKey();
        long checkpoint2Id = (Long)it.next().getKey();
        PendingCheckpoint checkpoint1 = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpoint1Id);
        PendingCheckpoint checkpoint2 = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpoint2Id);
        Assertions.assertThat((Object)checkpoint1).isNotNull();
        Assertions.assertThat((long)checkpoint1.getCheckpointID()).isEqualTo(checkpoint1Id);
        Assertions.assertThat((Comparable)checkpoint1.getJobId()).isEqualTo((Object)graph.getJobID());
        Assertions.assertThat((int)checkpoint1.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
        Assertions.assertThat((int)checkpoint1.getNumberOfAcknowledgedTasks()).isZero();
        Assertions.assertThat((Map)checkpoint1.getOperatorStates()).isEmpty();
        Assertions.assertThat((boolean)checkpoint1.isDisposed()).isFalse();
        Assertions.assertThat((boolean)checkpoint1.areTasksFullyAcknowledged()).isFalse();
        Assertions.assertThat((Object)checkpoint2).isNotNull();
        Assertions.assertThat((long)checkpoint2.getCheckpointID()).isEqualTo(checkpoint2Id);
        Assertions.assertThat((Comparable)checkpoint2.getJobId()).isEqualTo((Object)graph.getJobID());
        Assertions.assertThat((int)checkpoint2.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
        Assertions.assertThat((int)checkpoint2.getNumberOfAcknowledgedTasks()).isZero();
        Assertions.assertThat((Map)checkpoint2.getOperatorStates()).isEmpty();
        Assertions.assertThat((boolean)checkpoint2.isDisposed()).isFalse();
        Assertions.assertThat((boolean)checkpoint2.areTasksFullyAcknowledged()).isFalse();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            List<CheckpointCoordinatorTestingUtils.TriggeredCheckpoint> triggeredCheckpoints = gateway.getTriggeredCheckpoints(vertex.getCurrentExecutionAttempt().getAttemptId());
            Assertions.assertThat(triggeredCheckpoints).hasSize(2);
            Assertions.assertThat((long)triggeredCheckpoints.get((int)0).checkpointId).isEqualTo(checkpoint1Id);
            Assertions.assertThat((long)triggeredCheckpoints.get((int)1).checkpointId).isEqualTo(checkpoint2Id);
        }
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID1, checkpoint1Id, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            Assertions.assertThat((long)gateway.getOnlyNotifiedAbortedCheckpoint((ExecutionAttemptID)vertex.getCurrentExecutionAttempt().getAttemptId()).checkpointId).isEqualTo(checkpoint1Id);
        }
        Assertions.assertThat((boolean)checkpoint1.isDisposed()).isTrue();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        Assertions.assertThat((Collection)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(1);
        long checkpointIdNew = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        PendingCheckpoint checkpointNew = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointIdNew);
        Assertions.assertThat((long)checkpointIdNew).isEqualTo(checkpoint2Id);
        Assertions.assertThat((Object)checkpointNew).isNotNull();
        Assertions.assertThat((long)checkpointNew.getCheckpointID()).isEqualTo(checkpointIdNew);
        Assertions.assertThat((Comparable)checkpointNew.getJobId()).isEqualTo((Object)graph.getJobID());
        Assertions.assertThat((int)checkpointNew.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
        Assertions.assertThat((int)checkpointNew.getNumberOfAcknowledgedTasks()).isZero();
        Assertions.assertThat((Map)checkpointNew.getOperatorStates()).isEmpty();
        Assertions.assertThat((boolean)checkpointNew.isDisposed()).isFalse();
        Assertions.assertThat((boolean)checkpointNew.areTasksFullyAcknowledged()).isFalse();
        Assertions.assertThat((long)checkpointNew.getCheckpointID()).isNotEqualTo(checkpoint1.getCheckpointID());
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID1, checkpoint1Id, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID2, checkpoint1Id, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((boolean)checkpoint1.isDisposed()).isTrue();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            Assertions.assertThat(gateway.getNotifiedAbortedCheckpoints(vertex.getCurrentExecutionAttempt().getAttemptId())).hasSize(1);
        }
        checkpointCoordinator.shutdown();
    }

    @Test
    void testTriggerAndConfirmSimpleCheckpoint() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = this.getCheckpointCoordinator(graph);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        Assertions.assertThat((Collection)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        Assertions.assertThat((Collection)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).hasSize(1);
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        PendingCheckpoint checkpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
        Assertions.assertThat((Object)checkpoint).isNotNull();
        Assertions.assertThat((long)checkpoint.getCheckpointID()).isEqualTo(checkpointId);
        Assertions.assertThat((Comparable)checkpoint.getJobId()).isEqualTo((Object)graph.getJobID());
        Assertions.assertThat((int)checkpoint.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
        Assertions.assertThat((int)checkpoint.getNumberOfAcknowledgedTasks()).isZero();
        Assertions.assertThat((Map)checkpoint.getOperatorStates()).isEmpty();
        Assertions.assertThat((boolean)checkpoint.isDisposed()).isFalse();
        Assertions.assertThat((boolean)checkpoint.areTasksFullyAcknowledged()).isFalse();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assertions.assertThat((long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId).isEqualTo(checkpointId);
        }
        OperatorID opID1 = ((OperatorIDPair)vertex1.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID opID2 = ((OperatorIDPair)vertex2.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorSubtaskState subtaskState1 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState subtaskState2 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot(Collections.singletonMap(opID1, subtaskState1));
        TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot(Collections.singletonMap(opID2, subtaskState2));
        AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((int)checkpoint.getNumberOfAcknowledgedTasks()).isOne();
        Assertions.assertThat((int)checkpoint.getNumberOfNonAcknowledgedTasks()).isOne();
        Assertions.assertThat((boolean)checkpoint.isDisposed()).isFalse();
        Assertions.assertThat((boolean)checkpoint.areTasksFullyAcknowledged()).isFalse();
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)ArgumentMatchers.any(SharedStateRegistry.class), Mockito.eq((long)checkpointId));
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((boolean)checkpoint.isDisposed()).isFalse();
        Assertions.assertThat((boolean)checkpoint.areTasksFullyAcknowledged()).isFalse();
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.times((int)2))).registerSharedStates((SharedStateRegistry)ArgumentMatchers.any(SharedStateRegistry.class), Mockito.eq((long)checkpointId));
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((boolean)checkpoint.isDisposed()).isTrue();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((Collection)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState1, (VerificationMode)Mockito.times((int)1))).registerSharedStates((SharedStateRegistry)ArgumentMatchers.any(SharedStateRegistry.class), Mockito.eq((long)checkpointId));
        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState2, (VerificationMode)Mockito.times((int)2))).registerSharedStates((SharedStateRegistry)ArgumentMatchers.any(SharedStateRegistry.class), Mockito.eq((long)checkpointId));
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assertions.assertThat((long)gateway.getOnlyNotifiedCompletedCheckpoint((ExecutionAttemptID)attemptId).checkpointId).isEqualTo(checkpointId);
        }
        CompletedCheckpoint success = (CompletedCheckpoint)checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assertions.assertThat((Comparable)success.getJobId()).isEqualTo((Object)graph.getJobID());
        Assertions.assertThat((long)success.getCheckpointID()).isEqualTo(checkpoint.getCheckpointID());
        Assertions.assertThat((Map)success.getOperatorStates()).hasSize(2);
        gateway.resetCount();
        checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long checkpointIdNew = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointIdNew), TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
        Assertions.assertThat((Collection)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
        CompletedCheckpoint successNew = (CompletedCheckpoint)checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assertions.assertThat((Comparable)successNew.getJobId()).isEqualTo((Object)graph.getJobID());
        Assertions.assertThat((long)successNew.getCheckpointID()).isEqualTo(checkpointIdNew);
        Assertions.assertThat((Map)successNew.getOperatorStates()).hasSize(2);
        Assertions.assertThat((boolean)successNew.getOperatorStates().values().stream().allMatch(this::hasNoSubState)).isTrue();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assertions.assertThat((long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId).isEqualTo(checkpointIdNew);
            Assertions.assertThat((long)gateway.getOnlyNotifiedCompletedCheckpoint((ExecutionAttemptID)attemptId).checkpointId).isEqualTo(checkpointIdNew);
        }
        checkpointCoordinator.shutdown();
    }

    @Test
    void testMultipleConcurrentCheckpoints() throws Exception {
        ExecutionAttemptID attemptId;
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).addJobVertex(jobVertexID3, false).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionVertex vertex3 = graph.getJobVertex(jobVertexID3).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID3 = vertex3.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        CompletableFuture checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture1);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        PendingCheckpoint pending1 = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        long checkpointId1 = pending1.getCheckpointID();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId2 = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assertions.assertThat((long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId2).checkpointId).isEqualTo(checkpointId1);
        }
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId1), TASK_MANAGER_LOCATION_INFO);
        gateway.resetCount();
        CompletableFuture checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture2);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        Iterator all = checkpointCoordinator.getPendingCheckpoints().values().iterator();
        PendingCheckpoint cc1 = (PendingCheckpoint)all.next();
        PendingCheckpoint cc2 = (PendingCheckpoint)all.next();
        PendingCheckpoint pending2 = pending1 == cc1 ? cc2 : cc1;
        long checkpointId2 = pending2.getCheckpointID();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assertions.assertThat((long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId).isEqualTo(checkpointId2);
        }
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID3, checkpointId1), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId1), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
        Assertions.assertThat((boolean)pending1.isDisposed()).isTrue();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
            attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assertions.assertThat((long)gateway.getOnlyNotifiedCompletedCheckpoint((ExecutionAttemptID)attemptId).checkpointId).isEqualTo(checkpointId1);
        }
        gateway.resetCount();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID3, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isEqualTo(2);
        Assertions.assertThat((boolean)pending2.isDisposed()).isTrue();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
            attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assertions.assertThat((long)gateway.getOnlyNotifiedCompletedCheckpoint((ExecutionAttemptID)attemptId).checkpointId).isEqualTo(checkpointId2);
        }
        List scs = checkpointCoordinator.getSuccessfulCheckpoints();
        CompletedCheckpoint sc1 = (CompletedCheckpoint)scs.get(0);
        Assertions.assertThat((long)sc1.getCheckpointID()).isEqualTo(checkpointId1);
        Assertions.assertThat((Comparable)sc1.getJobId()).isEqualTo((Object)graph.getJobID());
        Assertions.assertThat((Map)sc1.getOperatorStates()).hasSize(3);
        Assertions.assertThat(sc1.getOperatorStates().values()).allMatch(this::hasNoSubState);
        CompletedCheckpoint sc2 = (CompletedCheckpoint)scs.get(1);
        Assertions.assertThat((long)sc2.getCheckpointID()).isEqualTo(checkpointId2);
        Assertions.assertThat((Comparable)sc2.getJobId()).isEqualTo((Object)graph.getJobID());
        Assertions.assertThat((Map)sc2.getOperatorStates()).hasSize(3);
        Assertions.assertThat(sc2.getOperatorStates().values()).allMatch(this::hasNoSubState);
        checkpointCoordinator.shutdown();
    }

    @Test
    void testSuccessfulCheckpointSubsumesUnsuccessful() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).addJobVertex(jobVertexID3, false).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionVertex vertex3 = graph.getJobVertex(jobVertexID3).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID3 = vertex3.getCurrentExecutionAttempt().getAttemptId();
        StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(10);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setCompletedCheckpointStore((CompletedCheckpointStore)completedCheckpointStore).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        CompletableFuture checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture1);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        PendingCheckpoint pending1 = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        long checkpointId1 = pending1.getCheckpointID();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assertions.assertThat((long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId).isEqualTo(checkpointId1);
        }
        OperatorID opID1 = ((OperatorIDPair)vertex1.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID opID2 = ((OperatorIDPair)vertex2.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID opID3 = ((OperatorIDPair)vertex3.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        TaskStateSnapshot taskOperatorSubtaskStates11 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        TaskStateSnapshot taskOperatorSubtaskStates12 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        TaskStateSnapshot taskOperatorSubtaskStates13 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        OperatorSubtaskStateMock subtaskState11mock = new OperatorSubtaskStateMock();
        OperatorSubtaskStateMock subtaskState12mock = new OperatorSubtaskStateMock();
        OperatorSubtaskStateMock subtaskState13mock = new OperatorSubtaskStateMock();
        OperatorSubtaskState subtaskState11 = subtaskState11mock.getSubtaskState();
        OperatorSubtaskState subtaskState12 = subtaskState12mock.getSubtaskState();
        OperatorSubtaskState subtaskState13 = subtaskState13mock.getSubtaskState();
        taskOperatorSubtaskStates11.putSubtaskStateByOperatorID(opID1, subtaskState11);
        taskOperatorSubtaskStates12.putSubtaskStateByOperatorID(opID2, subtaskState12);
        taskOperatorSubtaskStates13.putSubtaskStateByOperatorID(opID3, subtaskState13);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates12), TASK_MANAGER_LOCATION_INFO);
        gateway.resetCount();
        CompletableFuture checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture2);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        Iterator all = checkpointCoordinator.getPendingCheckpoints().values().iterator();
        PendingCheckpoint cc1 = (PendingCheckpoint)all.next();
        PendingCheckpoint cc2 = (PendingCheckpoint)all.next();
        PendingCheckpoint pending2 = pending1 == cc1 ? cc2 : cc1;
        long checkpointId2 = pending2.getCheckpointID();
        TaskStateSnapshot taskOperatorSubtaskStates21 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        TaskStateSnapshot taskOperatorSubtaskStates22 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        TaskStateSnapshot taskOperatorSubtaskStates23 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        OperatorSubtaskStateMock subtaskState21mock = new OperatorSubtaskStateMock();
        OperatorSubtaskStateMock subtaskState22mock = new OperatorSubtaskStateMock();
        OperatorSubtaskStateMock subtaskState23mock = new OperatorSubtaskStateMock();
        OperatorSubtaskState subtaskState21 = subtaskState21mock.getSubtaskState();
        OperatorSubtaskState subtaskState22 = subtaskState22mock.getSubtaskState();
        OperatorSubtaskState subtaskState23 = subtaskState23mock.getSubtaskState();
        taskOperatorSubtaskStates21.putSubtaskStateByOperatorID(opID1, subtaskState21);
        taskOperatorSubtaskStates22.putSubtaskStateByOperatorID(opID2, subtaskState22);
        taskOperatorSubtaskStates23.putSubtaskStateByOperatorID(opID3, subtaskState23);
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assertions.assertThat((long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId).isEqualTo(checkpointId2);
        }
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID3, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates23), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates21), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates11), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId2, new CheckpointMetrics(), taskOperatorSubtaskStates22), TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((boolean)pending1.isDisposed()).isTrue();
        Assertions.assertThat((boolean)pending2.isDisposed()).isTrue();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
        subtaskState11mock.verifyDiscard();
        subtaskState12mock.verifyDiscard();
        subtaskState21mock.verifyNotDiscard();
        subtaskState22mock.verifyNotDiscard();
        subtaskState23mock.verifyNotDiscard();
        List scs = checkpointCoordinator.getSuccessfulCheckpoints();
        CompletedCheckpoint success = (CompletedCheckpoint)scs.get(0);
        Assertions.assertThat((long)success.getCheckpointID()).isEqualTo(checkpointId2);
        Assertions.assertThat((Comparable)success.getJobId()).isEqualTo((Object)graph.getJobID());
        Assertions.assertThat((Map)success.getOperatorStates()).hasSize(3);
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2, vertex3)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assertions.assertThat((long)gateway.getOnlyNotifiedCompletedCheckpoint((ExecutionAttemptID)attemptId).checkpointId).isEqualTo(checkpointId2);
        }
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID3, checkpointId1, new CheckpointMetrics(), taskOperatorSubtaskStates13), TASK_MANAGER_LOCATION_INFO);
        subtaskState13mock.verifyDiscard();
        checkpointCoordinator.shutdown();
        completedCheckpointStore.shutdown(JobStatus.FINISHED, new CheckpointsCleaner());
        subtaskState21mock.verifyDiscard();
        subtaskState22mock.verifyDiscard();
        subtaskState23mock.verifyDiscard();
    }

    @Test
    void testCheckpointTimeoutIsolated() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2, false).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        PendingCheckpoint checkpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        Assertions.assertThat((boolean)checkpoint.isDisposed()).isFalse();
        OperatorID opID1 = ((OperatorIDPair)vertex1.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        TaskStateSnapshot taskOperatorSubtaskStates1 = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        OperatorSubtaskStateMock operatorSubtaskStateMock = new OperatorSubtaskStateMock();
        OperatorSubtaskState subtaskState1 = operatorSubtaskStateMock.getSubtaskState();
        taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpoint.getCheckpointID(), new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
        this.manuallyTriggeredScheduledExecutor.triggerScheduledTasks();
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)checkpoint.isDisposed()).as("Checkpoint was not canceled by the timeout", new Object[0])).isTrue();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        operatorSubtaskStateMock.verifyDiscard();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assertions.assertThat(gateway.getNotifiedCompletedCheckpoints(attemptId)).isEmpty();
        }
        checkpointCoordinator.shutdown();
    }

    @Test
    void testHandleMessagesForNonExistingCheckpoints() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2, false).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, 1L), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.shutdown();
    }

    @Test
    void testStateCleanupForLateOrUnknownMessages() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2, false).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(1).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        long checkpointId = pendingCheckpoint.getCheckpointID();
        OperatorID opIDtrigger = ((OperatorIDPair)vertex1.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        TaskStateSnapshot taskOperatorSubtaskStatesTrigger = (TaskStateSnapshot)Mockito.spy((Object)new TaskStateSnapshot());
        OperatorSubtaskStateMock subtaskStateMock = new OperatorSubtaskStateMock();
        OperatorSubtaskState subtaskStateTrigger = subtaskStateMock.getSubtaskState();
        taskOperatorSubtaskStatesTrigger.putSubtaskStateByOperatorID(opIDtrigger, subtaskStateTrigger);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStatesTrigger), TASK_MANAGER_LOCATION_INFO);
        subtaskStateMock.verifyNotDiscard();
        TaskStateSnapshot unknownSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId, new CheckpointMetrics(), unknownSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)unknownSubtaskState, (VerificationMode)Mockito.times((int)1))).discardState();
        TaskStateSnapshot differentJobSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)differentJobSubtaskState, (VerificationMode)Mockito.never())).discardState();
        TaskStateSnapshot triggerSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointMetrics(), triggerSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)triggerSubtaskState, (VerificationMode)Mockito.never())).discardState();
        subtaskStateMock.reset();
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((boolean)pendingCheckpoint.isDisposed()).isTrue();
        subtaskStateMock.verifyDiscard();
        TaskStateSnapshot ackSubtaskState = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId, new CheckpointMetrics(), ackSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)ackSubtaskState, (VerificationMode)Mockito.times((int)1))).discardState();
        Mockito.reset((Object[])new TaskStateSnapshot[]{differentJobSubtaskState});
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId, new CheckpointMetrics(), differentJobSubtaskState), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)differentJobSubtaskState, (VerificationMode)Mockito.never())).discardState();
        TaskStateSnapshot unknownSubtaskState2 = (TaskStateSnapshot)Mockito.mock(TaskStateSnapshot.class);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), checkpointId, new CheckpointMetrics(), unknownSubtaskState2), TASK_MANAGER_LOCATION_INFO);
        ((TaskStateSnapshot)Mockito.verify((Object)unknownSubtaskState2, (VerificationMode)Mockito.times((int)1))).discardState();
    }

    @Test
    void testMaxConcurrentAttempts1() {
        this.testMaxConcurrentAttempts(1);
    }

    @Test
    void testMaxConcurrentAttempts2() {
        this.testMaxConcurrentAttempts(2);
    }

    @Test
    void testMaxConcurrentAttempts5() {
        this.testMaxConcurrentAttempts(5);
    }

    @Test
    void testTriggerAndConfirmSimpleSavepoint() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        DefaultCheckpointStatsTracker statsTracker = new DefaultCheckpointStatsTracker(Integer.MAX_VALUE, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setAlignedCheckpointTimeout(Long.MAX_VALUE).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker((CheckpointStatsTracker)statsTracker).build(graph);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        String savepointDir = TempDirUtils.newFolder((java.nio.file.Path)this.tmpFolder).getAbsolutePath();
        CompletableFuture savepointFuture = checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((CompletableFuture)savepointFuture).isNotDone();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        long checkpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        PendingCheckpoint pending = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(checkpointId);
        Assertions.assertThat((Object)pending).isNotNull();
        Assertions.assertThat((long)pending.getCheckpointID()).isEqualTo(checkpointId);
        Assertions.assertThat((Comparable)pending.getJobId()).isEqualTo((Object)graph.getJobID());
        Assertions.assertThat((int)pending.getNumberOfNonAcknowledgedTasks()).isEqualTo(2);
        Assertions.assertThat((int)pending.getNumberOfAcknowledgedTasks()).isZero();
        Assertions.assertThat((Map)pending.getOperatorStates()).isEmpty();
        Assertions.assertThat((boolean)pending.isDisposed()).isFalse();
        Assertions.assertThat((boolean)pending.areTasksFullyAcknowledged()).isFalse();
        Assertions.assertThat((boolean)pending.canBeSubsumed()).isFalse();
        OperatorID opID1 = OperatorID.fromJobVertexID((JobVertexID)vertex1.getJobvertexId());
        OperatorID opID2 = OperatorID.fromJobVertexID((JobVertexID)vertex2.getJobvertexId());
        OperatorSubtaskState subtaskState1 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        OperatorSubtaskState subtaskState2 = (OperatorSubtaskState)Mockito.mock(OperatorSubtaskState.class);
        TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot(Collections.singletonMap(opID1, subtaskState1));
        TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot(Collections.singletonMap(opID2, subtaskState2));
        AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((int)pending.getNumberOfAcknowledgedTasks()).isOne();
        Assertions.assertThat((int)pending.getNumberOfNonAcknowledgedTasks()).isOne();
        Assertions.assertThat((boolean)pending.isDisposed()).isFalse();
        Assertions.assertThat((boolean)pending.areTasksFullyAcknowledged()).isFalse();
        Assertions.assertThat((boolean)savepointFuture.isDone()).isFalse();
        checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((boolean)pending.isDisposed()).isFalse();
        Assertions.assertThat((boolean)pending.areTasksFullyAcknowledged()).isFalse();
        Assertions.assertThat((CompletableFuture)savepointFuture).isNotDone();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1), TASK_MANAGER_LOCATION_INFO);
        Assertions.assertThat((boolean)pending.isDisposed()).isTrue();
        Assertions.assertThat((Object)((CompletedCheckpoint)savepointFuture.get())).isNotNull();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assertions.assertThat((long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId).isEqualTo(checkpointId);
            Assertions.assertThat(gateway.getNotifiedCompletedCheckpoints(attemptId)).isEmpty();
        }
        CompletedCheckpoint success = (CompletedCheckpoint)savepointFuture.get();
        Assertions.assertThat((Comparable)success.getJobId()).isEqualTo((Object)graph.getJobID());
        Assertions.assertThat((long)success.getCheckpointID()).isEqualTo(pending.getCheckpointID());
        Assertions.assertThat((Map)success.getOperatorStates()).hasSize(2);
        AbstractCheckpointStats actualStats = statsTracker.createSnapshot().getHistory().getCheckpointById(checkpointId);
        Assertions.assertThat((long)actualStats.getCheckpointId()).isEqualTo(checkpointId);
        Assertions.assertThat((Comparable)actualStats.getStatus()).isEqualTo((Object)CheckpointStatsStatus.COMPLETED);
        checkpointCoordinator.shutdown();
    }

    @Test
    void testSavepointsAreNotSubsumed() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        StandaloneCheckpointIDCounter counter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = (CheckpointCoordinator)Mockito.spy((Object)new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setCheckpointIDCounter((CheckpointIDCounter)counter).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph));
        String savepointDir = TempDirUtils.newFolder((java.nio.file.Path)this.tmpFolder).getAbsolutePath();
        CompletableFuture savepointFuture1 = checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long savepointId1 = counter.getLast();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        CompletableFuture checkpointFuture1 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture1);
        CompletableFuture checkpointFuture2 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture2);
        long checkpointId2 = counter.getLast();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(3);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId2), TASK_MANAGER_LOCATION_INFO);
        ((CheckpointCoordinator)Mockito.verify((Object)checkpointCoordinator, (VerificationMode)Mockito.times((int)1))).sendAcknowledgeMessages(ArgumentMatchers.anyList(), Mockito.eq((long)checkpointId2), ArgumentMatchers.anyLong(), Mockito.eq((long)-1L));
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
        Assertions.assertThat((boolean)((PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(savepointId1)).isDisposed()).isFalse();
        Assertions.assertThat((CompletableFuture)savepointFuture1).isNotDone();
        CompletableFuture checkpointFuture3 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture3);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
        CompletableFuture savepointFuture2 = checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long savepointId2 = counter.getLast();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)savepointFuture2);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(3);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, savepointId2), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, savepointId2), TASK_MANAGER_LOCATION_INFO);
        ((CheckpointCoordinator)Mockito.verify((Object)checkpointCoordinator, (VerificationMode)Mockito.times((int)0))).sendAcknowledgeMessages(ArgumentMatchers.anyList(), Mockito.eq((long)savepointId2), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
        Assertions.assertThat((boolean)((PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().get(savepointId1)).isDisposed()).isFalse();
        Assertions.assertThat((CompletableFuture)savepointFuture1).isNotDone();
        Assertions.assertThat((CompletableFuture)savepointFuture2).isCompletedWithValueMatching(Objects::nonNull);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, savepointId1), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, savepointId1), TASK_MANAGER_LOCATION_INFO);
        ((CheckpointCoordinator)Mockito.verify((Object)checkpointCoordinator, (VerificationMode)Mockito.times((int)0))).sendAcknowledgeMessages(ArgumentMatchers.anyList(), Mockito.eq((long)savepointId1), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isOne();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
        Assertions.assertThat((CompletableFuture)savepointFuture1).isCompletedWithValueMatching(Objects::nonNull);
        CompletableFuture checkpointFuture4 = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture4);
        long checkpointId4 = counter.getLast();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId4), TASK_MANAGER_LOCATION_INFO);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId4), TASK_MANAGER_LOCATION_INFO);
        ((CheckpointCoordinator)Mockito.verify((Object)checkpointCoordinator, (VerificationMode)Mockito.times((int)1))).sendAcknowledgeMessages(ArgumentMatchers.anyList(), Mockito.eq((long)checkpointId4), ArgumentMatchers.anyLong(), Mockito.eq((long)checkpointId2));
    }

    private void testMaxConcurrentAttempts(int maxConcurrentAttempts) {
        try {
            JobVertexID jobVertexID1 = new JobVertexID();
            CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
            ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
            ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
            ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
            CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(maxConcurrentAttempts).build();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
            checkpointCoordinator.startCheckpointScheduler();
            for (int i = 0; i < maxConcurrentAttempts; ++i) {
                this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
                this.manuallyTriggeredScheduledExecutor.triggerAll();
            }
            Assertions.assertThat((int)gateway.getTriggeredCheckpoints(attemptID1).size()).isEqualTo(maxConcurrentAttempts);
            Assertions.assertThat((int)gateway.getNotifiedCompletedCheckpoints(attemptID1).size()).isZero();
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, 1L), TASK_MANAGER_LOCATION_INFO);
            Predicate<ScheduledFuture> checkpointTriggerPredicate = scheduledFuture -> {
                ScheduledTask scheduledTask = (ScheduledTask)scheduledFuture;
                return scheduledTask.getCallable() instanceof ManuallyTriggeredScheduledExecutorService.RunnableCaller && ((ManuallyTriggeredScheduledExecutorService.RunnableCaller)scheduledTask.getCallable()).command.getClass().equals(CheckpointCoordinator.ScheduledTrigger.class);
            };
            long numCheckpointTriggerTasks = this.manuallyTriggeredScheduledExecutor.getActiveNonPeriodicScheduledTask().stream().filter(checkpointTriggerPredicate).count();
            Assertions.assertThat((long)numCheckpointTriggerTasks).isOne();
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assertions.assertThat(gateway.getTriggeredCheckpoints(attemptID1)).hasSize(maxConcurrentAttempts + 1);
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assertions.assertThat(gateway.getTriggeredCheckpoints(attemptID1)).hasSize(maxConcurrentAttempts + 1);
            checkpointCoordinator.shutdown();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testMaxConcurrentAttemptsWithSubsumption() throws Exception {
        int maxConcurrentAttempts = 2;
        JobVertexID jobVertexID1 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(2).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        checkpointCoordinator.startCheckpointScheduler();
        do {
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
        } while (checkpointCoordinator.getNumberOfPendingCheckpoints() < 2);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
        Assertions.assertThat((Map)checkpointCoordinator.getPendingCheckpoints()).containsKey((Object)1L);
        Assertions.assertThat((Map)checkpointCoordinator.getPendingCheckpoints()).containsKey((Object)2L);
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, 2L), TASK_MANAGER_LOCATION_INFO);
        do {
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
        } while (checkpointCoordinator.getNumberOfPendingCheckpoints() < 2);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isEqualTo(2);
        Assertions.assertThat((Map)checkpointCoordinator.getPendingCheckpoints()).containsKey((Object)3L);
        Assertions.assertThat((Map)checkpointCoordinator.getPendingCheckpoints()).containsKey((Object)4L);
        checkpointCoordinator.shutdown();
    }

    @Test
    void testPeriodicSchedulingWithInactiveTasks() throws Exception {
        CheckpointCoordinator checkpointCoordinator = this.setupCheckpointCoordinatorWithInactiveTasks((CheckpointStorage)new JobManagerCheckpointStorage());
        this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isGreaterThan(0);
    }

    private CheckpointCoordinator setupCheckpointCoordinatorWithInactiveTasks(CheckpointStorage checkpointStorage) throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).setTransitToRunning(false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointInterval(10L).setCheckpointTimeout(200000L).setMinPauseBetweenCheckpoints(0L).setMaxConcurrentCheckpoints(2).build();
        CheckpointIDCounterWithOwner checkpointIDCounter = new CheckpointIDCounterWithOwner();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setCheckpointStorage(checkpointStorage).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointIDCounter((CheckpointIDCounter)checkpointIDCounter).build(graph);
        checkpointIDCounter.setOwner(checkpointCoordinator);
        checkpointCoordinator.startCheckpointScheduler();
        this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        vertex1.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
        this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks(CheckpointCoordinator.ScheduledTrigger.class);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        return checkpointCoordinator;
    }

    @Test
    void testConcurrentSavepoints() throws Exception {
        int numSavepoints = 5;
        JobVertexID jobVertexID1 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter();
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMaxConcurrentCheckpoints(1).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setCheckpointIDCounter((CheckpointIDCounter)checkpointIDCounter).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        ArrayList<CompletableFuture> savepointFutures = new ArrayList<CompletableFuture>();
        String savepointDir = TempDirUtils.newFolder((java.nio.file.Path)this.tmpFolder).getAbsolutePath();
        for (int i = 0; i < numSavepoints; ++i) {
            savepointFutures.add(checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL));
        }
        for (CompletableFuture savepointFuture : savepointFutures) {
            Assertions.assertThat((CompletableFuture)savepointFuture).isNotDone();
        }
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long checkpointId = checkpointIDCounter.getLast();
        int i = 0;
        while (i < numSavepoints) {
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId), TASK_MANAGER_LOCATION_INFO);
            ++i;
            --checkpointId;
        }
        for (CompletableFuture savepointFuture : savepointFutures) {
            Assertions.assertThat((CompletableFuture)savepointFuture).isCompletedWithValueMatching(Objects::nonNull);
        }
    }

    @Test
    void testMinDelayBetweenSavepoints() throws Exception {
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setMinPauseBetweenCheckpoints(100000000L).setMaxConcurrentCheckpoints(1).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setCompletedCheckpointStore((CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        String savepointDir = TempDirUtils.newFolder((java.nio.file.Path)this.tmpFolder).getAbsolutePath();
        CompletableFuture savepoint0 = checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)savepoint0).as("Did not trigger savepoint", new Object[0])).isNotDone();
        CompletableFuture savepoint1 = checkpointCoordinator.triggerSavepoint(savepointDir, SavepointFormatType.CANONICAL);
        ((CompletableFutureAssert)Assertions.assertThat((CompletableFuture)savepoint1).as("Did not trigger savepoint", new Object[0])).isNotDone();
    }

    @Test
    void testExternalizedCheckpoints() throws Exception {
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinatorConfiguration chkConfig = new CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder().setCheckpointRetentionPolicy(CheckpointRetentionPolicy.RETAIN_ON_FAILURE).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(chkConfig).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        for (PendingCheckpoint checkpoint : checkpointCoordinator.getPendingCheckpoints().values()) {
            CheckpointProperties props = checkpoint.getProps();
            CheckpointProperties expected = CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.RETAIN_ON_FAILURE);
            Assertions.assertThat((Object)props).isEqualTo((Object)expected);
        }
        checkpointCoordinator.shutdown();
    }

    @Test
    void testCreateKeyGroupPartitions() {
        this.testCreateKeyGroupPartitions(1, 1);
        this.testCreateKeyGroupPartitions(13, 1);
        this.testCreateKeyGroupPartitions(13, 2);
        this.testCreateKeyGroupPartitions(Short.MAX_VALUE, 1);
        this.testCreateKeyGroupPartitions(Short.MAX_VALUE, 13);
        this.testCreateKeyGroupPartitions(Short.MAX_VALUE, Short.MAX_VALUE);
        Random r = new Random(1234L);
        for (int k = 0; k < 1000; ++k) {
            int maxParallelism = 1 + r.nextInt(32766);
            int parallelism = 1 + r.nextInt(maxParallelism);
            this.testCreateKeyGroupPartitions(maxParallelism, parallelism);
        }
    }

    private void testCreateKeyGroupPartitions(int maxParallelism, int parallelism) {
        List ranges = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism, (int)parallelism);
        for (int i = 0; i < maxParallelism; ++i) {
            KeyGroupRange range = (KeyGroupRange)ranges.get(KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)maxParallelism, (int)parallelism, (int)i));
            if (range.contains(i)) continue;
            Assertions.fail((String)("Could not find expected key-group " + i + " in range " + range));
        }
    }

    @Test
    void testPartitionableStateRepartitioning() {
        Random r = new Random(42L);
        for (int run = 0; run < 10000; ++run) {
            int oldParallelism = 1 + r.nextInt(9);
            int newParallelism = 1 + r.nextInt(9);
            int numNamedStates = 1 + r.nextInt(9);
            int maxPartitionsPerState = 1 + r.nextInt(9);
            this.doTestPartitionableStateRepartitioning(r, oldParallelism, newParallelism, numNamedStates, maxPartitionsPerState);
        }
    }

    /*
     * WARNING - void declaration
     */
    private void doTestPartitionableStateRepartitioning(Random r, int oldParallelism, int newParallelism, int numNamedStates, int maxPartitionsPerState) {
        Object offs;
        ArrayList<List<OperatorStreamStateHandle>> previousParallelOpInstanceStates = new ArrayList<List<OperatorStreamStateHandle>>(oldParallelism);
        for (int i = 0; i < oldParallelism; ++i) {
            void var11_16;
            Path fakePath = new Path("/fake-" + i);
            HashMap<CallSite, OperatorStateHandle.StateMetaInfo> namedStatesToOffsets = new HashMap<CallSite, OperatorStateHandle.StateMetaInfo>();
            int off = 0;
            boolean bl = false;
            while (var11_16 < numNamedStates - 1) {
                offs = new long[1 + r.nextInt(maxPartitionsPerState)];
                for (int o = 0; o < ((Object)offs).length; ++o) {
                    offs[o] = (long)off;
                    ++off;
                }
                OperatorStateHandle.Mode mode = r.nextInt(10) == 0 ? OperatorStateHandle.Mode.UNION : OperatorStateHandle.Mode.SPLIT_DISTRIBUTE;
                namedStatesToOffsets.put((CallSite)((Object)("State-" + (int)var11_16)), new OperatorStateHandle.StateMetaInfo((long[])offs, mode));
                ++var11_16;
            }
            if (numNamedStates % 2 == 0) {
                long[] lArray = new long[]{off + 1, off + 2, off + 3, off + 4};
                namedStatesToOffsets.put((CallSite)((Object)("State-" + (numNamedStates - 1))), new OperatorStateHandle.StateMetaInfo(lArray, OperatorStateHandle.Mode.BROADCAST));
            }
            previousParallelOpInstanceStates.add(Collections.singletonList(new OperatorStreamStateHandle(namedStatesToOffsets, (StreamStateHandle)new FileStateHandle(fakePath, -1L))));
        }
        HashMap expected = new HashMap();
        int taskIndex = 0;
        int expectedTotalPartitions = 0;
        for (List list : previousParallelOpInstanceStates) {
            Assertions.assertThat((int)list.size()).isOne();
            offs = list.iterator();
            while (offs.hasNext()) {
                OperatorStateHandle psh = (OperatorStateHandle)offs.next();
                Map offsMap = psh.getStateNameToPartitionOffsets();
                HashMap offsMapWithList = new HashMap(offsMap.size());
                for (Map.Entry e : offsMap.entrySet()) {
                    int replication;
                    long[] offs3 = ((OperatorStateHandle.StateMetaInfo)e.getValue()).getOffsets();
                    switch (((OperatorStateHandle.StateMetaInfo)e.getValue()).getDistributionMode()) {
                        case UNION: {
                            replication = newParallelism;
                            break;
                        }
                        case BROADCAST: {
                            int extra = taskIndex < newParallelism % oldParallelism ? 1 : 0;
                            replication = newParallelism / oldParallelism + extra;
                            break;
                        }
                        case SPLIT_DISTRIBUTE: {
                            replication = 1;
                            break;
                        }
                        default: {
                            throw new RuntimeException("Unknown distribution mode " + ((OperatorStateHandle.StateMetaInfo)e.getValue()).getDistributionMode());
                        }
                    }
                    if (replication <= 0) continue;
                    expectedTotalPartitions += replication * offs3.length;
                    ArrayList<Long> offsList = new ArrayList<Long>(offs3.length);
                    for (Object off : (Object)offs3) {
                        for (int p = 0; p < replication; ++p) {
                            offsList.add((long)off);
                        }
                    }
                    offsMapWithList.put((String)e.getKey(), offsList);
                }
                if (!offsMapWithList.isEmpty()) {
                    expected.put(psh.getDelegateStateHandle(), offsMapWithList);
                }
                ++taskIndex;
            }
        }
        OperatorStateRepartitioner repartitioner = RoundRobinOperatorStateRepartitioner.INSTANCE;
        List list = repartitioner.repartitionState(previousParallelOpInstanceStates, oldParallelism, newParallelism);
        HashMap<StreamStateHandle, Map> actual = new HashMap<StreamStateHandle, Map>();
        int minCount = Integer.MAX_VALUE;
        int maxCount = 0;
        int actualTotalPartitions = 0;
        for (int p = 0; p < newParallelism; ++p) {
            int partitionCount = 0;
            Collection pshc = (Collection)list.get(p);
            for (OperatorStateHandle sh : pshc) {
                for (Map.Entry namedState : sh.getStateNameToPartitionOffsets().entrySet()) {
                    long[] add;
                    Map stateToOffsets = actual.computeIfAbsent(sh.getDelegateStateHandle(), k -> new HashMap());
                    List actualOffs = stateToOffsets.computeIfAbsent((String)namedState.getKey(), k -> new ArrayList());
                    for (long l : add = ((OperatorStateHandle.StateMetaInfo)namedState.getValue()).getOffsets()) {
                        actualOffs.add(l);
                    }
                    partitionCount += ((OperatorStateHandle.StateMetaInfo)namedState.getValue()).getOffsets().length;
                }
            }
            minCount = Math.min(minCount, partitionCount);
            maxCount = Math.max(maxCount, partitionCount);
            actualTotalPartitions += partitionCount;
        }
        for (Map v : actual.values()) {
            for (List l : v.values()) {
                Collections.sort(l);
            }
        }
        if (oldParallelism != newParallelism) {
            int maxLoadDiff = maxCount - minCount;
            ((AbstractBooleanAssert)Assertions.assertThat((maxLoadDiff <= 1 ? 1 : 0) != 0).as("Difference in partition load is > 1 : " + maxLoadDiff, new Object[0])).isTrue();
        }
        Assertions.assertThat((int)actualTotalPartitions).isEqualTo(expectedTotalPartitions);
        Assertions.assertThat(actual).isEqualTo(expected);
    }

    @Test
    void testCheckpointStatsTrackerPendingCheckpointCallback() throws Exception {
        CheckpointStatsTracker tracker = (CheckpointStatsTracker)Mockito.mock(CheckpointStatsTracker.class);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker(tracker).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        Mockito.when((Object)tracker.reportPendingCheckpoint(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), (CheckpointProperties)ArgumentMatchers.any(CheckpointProperties.class), (Map)ArgumentMatchers.any(Map.class))).thenReturn((Object)((PendingCheckpointStats)Mockito.mock(PendingCheckpointStats.class)));
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        ((CheckpointStatsTracker)Mockito.verify((Object)tracker, (VerificationMode)Mockito.times((int)1))).reportPendingCheckpoint(Mockito.eq((long)1L), ((Long)ArgumentMatchers.any(Long.class)).longValue(), (CheckpointProperties)Mockito.eq((Object)CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION)), (Map)ArgumentMatchers.any());
    }

    @Test
    void testCheckpointStatsTrackerRestoreCallback() throws Exception {
        StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(1);
        DefaultCheckpointStatsTracker tracker = new DefaultCheckpointStatsTracker(10, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCompletedCheckpointStore((CompletedCheckpointStore)store).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker((CheckpointStatsTracker)tracker).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        store.addCheckpointAndSubsumeOldestOne(new CompletedCheckpoint(new JobID(), 42L, 0L, 0L, Collections.emptyMap(), Collections.emptyList(), CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), (CompletedCheckpointStorageLocation)new TestCompletedCheckpointStorageLocation(), null), new CheckpointsCleaner(), () -> {});
        Assertions.assertThat((boolean)checkpointCoordinator.restoreLatestCheckpointedStateToAll(Collections.emptySet(), true)).isTrue();
        Assertions.assertThat((long)tracker.createSnapshot().getLatestRestoredCheckpoint().getCheckpointId()).isEqualTo(42L);
    }

    @Test
    void testSharedStateRegistrationOnRestore() throws Exception {
        for (RecoveryClaimMode recoveryClaimMode : RecoveryClaimMode.values()) {
            JobVertexID jobVertexID1 = new JobVertexID();
            int parallelism1 = 2;
            int maxParallelism1 = 4;
            ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1, parallelism1, maxParallelism1).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
            ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
            List checkpoints = Collections.emptyList();
            SharedStateRegistry firstInstance = SharedStateRegistry.DEFAULT_FACTORY.create(org.apache.flink.util.concurrent.Executors.directExecutor(), checkpoints, recoveryClaimMode);
            EmbeddedCompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore(10, checkpoints, firstInstance);
            CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder coordinatorBuilder = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor);
            CheckpointCoordinator coordinator = coordinatorBuilder.setCompletedCheckpointStore((CompletedCheckpointStore)store).build(graph);
            int numCheckpoints = 3;
            List keyGroupPartitions1 = StateAssignmentOperation.createKeyGroupPartitions((int)maxParallelism1, (int)parallelism1);
            for (int i = 0; i < 3; ++i) {
                this.performIncrementalCheckpoint(graph.getJobID(), coordinator, jobVertex1, keyGroupPartitions1, i);
            }
            List completedCheckpoints = coordinator.getSuccessfulCheckpoints();
            Assertions.assertThat((int)completedCheckpoints.size()).isEqualTo(3);
            int sharedHandleCount = 0;
            ArrayList<List<IncrementalKeyedStateHandle.HandleAndLocalPath>> sharedHandlesByCheckpoint = new ArrayList<List<IncrementalKeyedStateHandle.HandleAndLocalPath>>(3);
            for (int i = 0; i < 3; ++i) {
                sharedHandlesByCheckpoint.add(new ArrayList(2));
            }
            int cp = 0;
            for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
                for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
                    for (OperatorSubtaskState subtaskState : taskState.getStates()) {
                        for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) {
                            ((KeyedStateHandle)Mockito.verify((Object)keyedStateHandle, (VerificationMode)Mockito.times((int)1))).registerSharedStates(firstInstance, completedCheckpoint.getCheckpointID());
                            IncrementalRemoteKeyedStateHandle incrementalKeyedStateHandle = (IncrementalRemoteKeyedStateHandle)keyedStateHandle;
                            ((List)sharedHandlesByCheckpoint.get(cp)).addAll(incrementalKeyedStateHandle.getSharedState());
                            for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath : incrementalKeyedStateHandle.getSharedState()) {
                                StreamStateHandle streamStateHandle = handleAndLocalPath.getHandle();
                                Assertions.assertThat((boolean)(streamStateHandle instanceof PlaceholderStreamStateHandle)).isFalse();
                                DiscardRecordedStateObject.verifyDiscard((StateObject)streamStateHandle, TernaryBoolean.FALSE);
                                ++sharedHandleCount;
                            }
                            for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath : incrementalKeyedStateHandle.getPrivateState()) {
                                DiscardRecordedStateObject.verifyDiscard((StateObject)handleAndLocalPath.getHandle(), TernaryBoolean.FALSE);
                            }
                            ((StreamStateHandle)Mockito.verify((Object)incrementalKeyedStateHandle.getMetaDataStateHandle(), (VerificationMode)Mockito.never())).discardState();
                        }
                        ((OperatorSubtaskState)Mockito.verify((Object)subtaskState, (VerificationMode)Mockito.never())).discardState();
                    }
                }
                ++cp;
            }
            Assertions.assertThat((int)sharedHandleCount).isEqualTo(10);
            store.removeOldestCheckpoint();
            for (List list : sharedHandlesByCheckpoint) {
                for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath : list) {
                    DiscardRecordedStateObject.verifyDiscard((StateObject)handleAndLocalPath.getHandle(), TernaryBoolean.FALSE);
                }
            }
            store.shutdown(JobStatus.SUSPENDED, new CheckpointsCleaner());
            HashSet<ExecutionJobVertex> tasks = new HashSet<ExecutionJobVertex>();
            tasks.add(jobVertex1);
            Assertions.assertThat((Comparable)((JobStatus)store.getShutdownStatus().orElse(null))).isEqualTo((Object)JobStatus.SUSPENDED);
            SharedStateRegistry sharedStateRegistry = SharedStateRegistry.DEFAULT_FACTORY.create(org.apache.flink.util.concurrent.Executors.directExecutor(), (Collection)store.getAllCheckpoints(), recoveryClaimMode);
            EmbeddedCompletedCheckpointStore secondStore = new EmbeddedCompletedCheckpointStore(10, (Collection)store.getAllCheckpoints(), sharedStateRegistry);
            CheckpointCoordinator secondCoordinator = coordinatorBuilder.setCompletedCheckpointStore((CompletedCheckpointStore)secondStore).build(graph);
            Assertions.assertThat((boolean)secondCoordinator.restoreLatestCheckpointedStateToAll(tasks, false)).isTrue();
            cp = 0;
            for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) {
                for (OperatorState taskState : completedCheckpoint.getOperatorStates().values()) {
                    for (OperatorSubtaskState subtaskState : taskState.getStates()) {
                        for (KeyedStateHandle keyedStateHandle : subtaskState.getManagedKeyedState()) {
                            VerificationMode verificationMode = cp > 0 ? Mockito.times((int)1) : Mockito.never();
                            ((KeyedStateHandle)Mockito.verify((Object)keyedStateHandle, (VerificationMode)verificationMode)).registerSharedStates(sharedStateRegistry, completedCheckpoint.getCheckpointID());
                        }
                    }
                }
                ++cp;
            }
            secondStore.removeOldestCheckpoint();
            CheckpointCoordinatorTest.verifyDiscard(sharedHandlesByCheckpoint, cpId -> recoveryClaimMode == RecoveryClaimMode.CLAIM && cpId == 0 ? TernaryBoolean.TRUE : TernaryBoolean.FALSE);
            secondStore.removeOldestCheckpoint();
            CheckpointCoordinatorTest.verifyDiscard(sharedHandlesByCheckpoint, cpId -> cpId == 1 ? TernaryBoolean.FALSE : TernaryBoolean.UNDEFINED);
        }
    }

    @Test
    void jobFailsIfInFlightSynchronousSavepointIsDiscarded() throws Exception {
        final Tuple2 invocationCounterAndException = Tuple2.of((Object)0, null);
        IOException expectedRootCause = new IOException("Custom-Exception");
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinator coordinator = this.getCheckpointCoordinator(graph, new CheckpointFailureManager(0, new CheckpointFailureManager.FailJobCallback(){

            public void failJob(Throwable cause) {
                invocationCounterAndException.f0 = (Integer)invocationCounterAndException.f0 + 1;
                invocationCounterAndException.f1 = cause;
            }

            public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
                throw new AssertionError((Object)"This method should not be called for the test.");
            }
        }));
        CompletableFuture savepointFuture = coordinator.triggerSynchronousSavepoint(false, "test-dir", SavepointFormatType.CANONICAL);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        PendingCheckpoint syncSavepoint = this.declineSynchronousSavepoint(graph.getJobID(), coordinator, attemptID1, expectedRootCause);
        Assertions.assertThat((boolean)syncSavepoint.isDisposed()).isTrue();
        String expectedRootCauseMessage = String.format("%s: %s", expectedRootCause.getClass().getName(), expectedRootCause.getMessage());
        try {
            savepointFuture.get();
            Assertions.fail((String)"Expected Exception not found.");
        }
        catch (ExecutionException e) {
            Throwable cause = ExceptionUtils.stripExecutionException((Throwable)e);
            Assertions.assertThat((boolean)(cause instanceof CheckpointException)).isTrue();
            Assertions.assertThat((String)cause.getCause().getCause().getMessage()).isEqualTo(expectedRootCauseMessage);
        }
        Assertions.assertThat((int)((Integer)invocationCounterAndException.f0)).isOne();
        Assertions.assertThat((invocationCounterAndException.f1 instanceof CheckpointException && ((Throwable)invocationCounterAndException.f1).getCause().getCause().getMessage().equals(expectedRootCauseMessage) ? 1 : 0) != 0).isTrue();
        coordinator.shutdown();
    }

    @Test
    void testTriggerCheckpointAfterStopping() throws Exception {
        StoppingCheckpointIDCounter testingCounter = new StoppingCheckpointIDCounter();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter((CheckpointIDCounter)testingCounter).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        testingCounter.setOwner(checkpointCoordinator);
        this.testTriggerCheckpoint(checkpointCoordinator, CheckpointFailureReason.PERIODIC_SCHEDULER_SHUTDOWN);
    }

    @Test
    void testTriggerCheckpointWithCounterIOException() throws Exception {
        IOExceptionCheckpointIDCounter testingCounter = new IOExceptionCheckpointIDCounter();
        TestFailJobCallback failureCallback = new TestFailJobCallback();
        DefaultCheckpointStatsTracker statsTracker = new DefaultCheckpointStatsTracker(Integer.MAX_VALUE, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter((CheckpointIDCounter)testingCounter).setFailureManager(new CheckpointFailureManager(0, (CheckpointFailureManager.FailJobCallback)failureCallback)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker((CheckpointStatsTracker)statsTracker).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        testingCounter.setOwner(checkpointCoordinator);
        this.testTriggerCheckpoint(checkpointCoordinator, CheckpointFailureReason.IO_EXCEPTION);
        Assertions.assertThat((int)failureCallback.getInvokeCounter()).isOne();
        CheckpointStatsCounts counts = statsTracker.createSnapshot().getCounts();
        Assertions.assertThat((long)counts.getNumberOfRestoredCheckpoints()).isZero();
        Assertions.assertThat((long)counts.getTotalNumberOfCheckpoints()).isOne();
        Assertions.assertThat((int)counts.getNumberOfInProgressCheckpoints()).isZero();
        Assertions.assertThat((long)counts.getNumberOfCompletedCheckpoints()).isZero();
        Assertions.assertThat((long)counts.getNumberOfFailedCheckpoints()).isOne();
        Assertions.assertThat((Object)statsTracker.getPendingCheckpointStats(1L)).isNull();
    }

    @Test
    void testExceptionInStoringCompletedCheckpointIsReportedToFailureManager() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTransitToRunning(false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex task = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        task.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
        CheckpointIDCounterWithOwner testingCounter = new CheckpointIDCounterWithOwner();
        TestFailJobCallback failureCallback = new TestFailJobCallback();
        DefaultCheckpointStatsTracker statsTracker = new DefaultCheckpointStatsTracker(Integer.MAX_VALUE, UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup());
        String exceptionMsg = "Test store exception.";
        try (SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();){
            TestingCompletedCheckpointStore testingCompletedCheckpointStore = TestingCompletedCheckpointStore.builder().withGetSharedStateRegistrySupplier(() -> CheckpointCoordinatorTest.lambda$testExceptionInStoringCompletedCheckpointIsReportedToFailureManager$12((SharedStateRegistry)sharedStateRegistry)).withAddCheckpointAndSubsumeOldestOneFunction((TriFunction<CompletedCheckpoint, CheckpointsCleaner, Runnable, CompletedCheckpoint>)((TriFunction)(cp, cleaner, runnable) -> {
                throw new RuntimeException("Test store exception.");
            })).build();
            CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointIDCounter((CheckpointIDCounter)testingCounter).setFailureManager(new CheckpointFailureManager(0, (CheckpointFailureManager.FailJobCallback)failureCallback)).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCheckpointStatsTracker((CheckpointStatsTracker)statsTracker).setCompletedCheckpointStore(testingCompletedCheckpointStore).build(graph);
            testingCounter.setOwner(checkpointCoordinator);
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getValue();
            try {
                checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(pendingCheckpoint.getJobId(), task.getCurrentExecutionAttempt().getAttemptId(), pendingCheckpoint.getCheckpointID(), new CheckpointMetrics(), new TaskStateSnapshot()), "localhost");
                Assertions.fail((String)"Exception is expected here");
            }
            catch (CheckpointException cpex) {
                Assertions.assertThat((Comparable)cpex.getCheckpointFailureReason()).isEqualTo((Object)CheckpointFailureReason.FINALIZE_CHECKPOINT_FAILURE);
                Assertions.assertThat((String)cpex.getCause().getMessage()).isEqualTo("Test store exception.");
            }
            Assertions.assertThat((int)failureCallback.getInvokeCounter()).isOne();
            CheckpointStatsCounts counts = statsTracker.createSnapshot().getCounts();
            Assertions.assertThat((long)counts.getNumberOfRestoredCheckpoints()).isZero();
            Assertions.assertThat((long)counts.getTotalNumberOfCheckpoints()).isOne();
            Assertions.assertThat((int)counts.getNumberOfInProgressCheckpoints()).isZero();
            Assertions.assertThat((long)counts.getNumberOfCompletedCheckpoints()).isZero();
            Assertions.assertThat((long)counts.getNumberOfFailedCheckpoints()).isOne();
            Assertions.assertThat((Object)statsTracker.getPendingCheckpointStats(1L)).isNotNull();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testTriggerCheckpoint(CheckpointCoordinator checkpointCoordinator, CheckpointFailureReason expectedFailureReason) throws Exception {
        try {
            checkpointCoordinator.startCheckpointScheduler();
            CompletableFuture onCompletionPromise = checkpointCoordinator.triggerCheckpoint(CheckpointProperties.forCheckpoint((CheckpointRetentionPolicy)CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), null, true);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            try {
                onCompletionPromise.get();
                Assertions.fail((String)"should not trigger periodic checkpoint");
            }
            catch (ExecutionException e) {
                Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
                if (!checkpointExceptionOptional.isPresent() || ((CheckpointException)((Object)checkpointExceptionOptional.get())).getCheckpointFailureReason() != expectedFailureReason) {
                    throw e;
                }
            }
        }
        finally {
            checkpointCoordinator.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testSavepointScheduledInUnalignedMode() throws Exception {
        int maxConcurrentCheckpoints = 1;
        int checkpointRequestsToSend = 10;
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinator coordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setUnalignedCheckpointsEnabled(true).setMaxConcurrentCheckpoints(maxConcurrentCheckpoints).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
        try {
            int activeRequests;
            ArrayList<CompletableFuture> checkpointFutures = new ArrayList<CompletableFuture>(checkpointRequestsToSend);
            coordinator.startCheckpointScheduler();
            for (activeRequests = 0; activeRequests < checkpointRequestsToSend; ++activeRequests) {
                checkpointFutures.add(coordinator.triggerCheckpoint(true));
            }
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assertions.assertThat((int)coordinator.getNumQueuedRequests()).isEqualTo(activeRequests - maxConcurrentCheckpoints);
            CompletableFuture savepointFuture = coordinator.triggerSavepoint("/tmp", SavepointFormatType.CANONICAL);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assertions.assertThat((int)coordinator.getNumQueuedRequests()).isEqualTo(++activeRequests - maxConcurrentCheckpoints);
            coordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), ExecutionGraphTestUtils.createExecutionAttemptId(), 1L, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), "none");
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assertions.assertThat((int)coordinator.getNumQueuedRequests()).isEqualTo(--activeRequests - maxConcurrentCheckpoints);
            Assertions.assertThat((long)checkpointFutures.stream().filter(Future::isDone).count()).isOne();
            Assertions.assertThat((boolean)savepointFuture.isDone()).isFalse();
            Assertions.assertThat((int)coordinator.getNumberOfPendingCheckpoints()).isEqualTo(maxConcurrentCheckpoints);
            CheckpointProperties props = ((PendingCheckpoint)coordinator.getPendingCheckpoints().values().iterator().next()).getProps();
            Assertions.assertThat((boolean)props.isSavepoint()).isTrue();
            Assertions.assertThat((boolean)props.forceCheckpoint()).isFalse();
        }
        finally {
            coordinator.shutdown();
        }
    }

    @Test
    void testExternallyInducedSourceWithOperatorCoordinator() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway gateway = new CheckpointCoordinatorTestingUtils.CheckpointRecorderTaskManagerGateway();
        final ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).setTaskManagerGateway(gateway).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        final ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        final ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        OperatorID opID1 = ((OperatorIDPair)vertex1.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID opID2 = ((OperatorIDPair)vertex2.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        final TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
        final TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot();
        OperatorSubtaskState subtaskState1 = OperatorSubtaskState.builder().build();
        OperatorSubtaskState subtaskState2 = OperatorSubtaskState.builder().build();
        taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
        taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID2, subtaskState2);
        final AtomicBoolean coordCheckpointDone = new AtomicBoolean(false);
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext coordinatorCheckpointContext = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOnCallingCheckpointCoordinator((checkpointId, result) -> {
            coordCheckpointDone.set(true);
            result.complete(new byte[0]);
        }).setOperatorID(opID1).build();
        final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(coordinatorCheckpointContext)).build(graph);
        final AtomicReference checkpointIdRef = new AtomicReference();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)new MasterTriggerRestoreHook<Integer>(){

            public String getIdentifier() {
                return "anything";
            }

            @Nullable
            public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)coordCheckpointDone.get()).as("The coordinator checkpoint should have finished.", new Object[0])).isTrue();
                checkpointIdRef.set(checkpointId);
                AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1);
                AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
                checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                return null;
            }

            public void restoreCheckpoint(long checkpointId, Integer checkpointData) {
            }

            public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
                return new SimpleVersionedSerializer<Integer>(){

                    public int getVersion() {
                        return 0;
                    }

                    public byte[] serialize(Integer obj) {
                        return new byte[0];
                    }

                    public Integer deserialize(int version, byte[] serialized) {
                        return 1;
                    }
                };
            }
        });
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isZero();
        Assertions.assertThat((int)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks().size()).isZero();
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        FutureUtils.throwIfCompletedExceptionally((CompletableFuture)checkpointFuture);
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints()).isOne();
        Assertions.assertThat((int)checkpointCoordinator.getNumberOfPendingCheckpoints()).isZero();
        Assertions.assertThat((Collection)this.manuallyTriggeredScheduledExecutor.getActiveScheduledTasks()).isEmpty();
        long checkpointId2 = (Long)checkpointIdRef.get();
        for (ExecutionVertex vertex : Arrays.asList(vertex1, vertex2)) {
            ExecutionAttemptID attemptId = vertex.getCurrentExecutionAttempt().getAttemptId();
            Assertions.assertThat((long)gateway.getOnlyTriggeredCheckpoint((ExecutionAttemptID)attemptId).checkpointId).isEqualTo(checkpointId2);
        }
        CompletedCheckpoint success = (CompletedCheckpoint)checkpointCoordinator.getSuccessfulCheckpoints().get(0);
        Assertions.assertThat((Comparable)success.getJobId()).isEqualTo((Object)graph.getJobID());
        Assertions.assertThat((int)success.getOperatorStates().size()).isEqualTo(2);
        checkpointCoordinator.shutdown();
    }

    @Test
    void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
        JobVertexID jobVertexID1 = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        final ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID1).addJobVertex(jobVertexID2).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex vertex1 = graph.getJobVertex(jobVertexID1).getTaskVertices()[0];
        ExecutionVertex vertex2 = graph.getJobVertex(jobVertexID2).getTaskVertices()[0];
        final ExecutionAttemptID attemptID1 = vertex1.getCurrentExecutionAttempt().getAttemptId();
        final ExecutionAttemptID attemptID2 = vertex2.getCurrentExecutionAttempt().getAttemptId();
        OperatorID opID1 = ((OperatorIDPair)vertex1.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        OperatorID opID2 = ((OperatorIDPair)vertex2.getJobVertex().getOperatorIDs().get(0)).getGeneratedOperatorID();
        final TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
        final TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot();
        OperatorSubtaskState subtaskState1 = OperatorSubtaskState.builder().build();
        OperatorSubtaskState subtaskState2 = OperatorSubtaskState.builder().build();
        taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
        taskOperatorSubtaskStates2.putSubtaskStateByOperatorID(opID2, subtaskState2);
        final AtomicBoolean coordCheckpointDone = new AtomicBoolean(false);
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext coordinatorCheckpointContext = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOnCallingCheckpointCoordinator((checkpointId, result) -> {
            coordCheckpointDone.set(true);
            result.complete(new byte[0]);
        }).setOperatorID(opID1).build();
        final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(coordinatorCheckpointContext)).setCheckpointStorage((CheckpointStorage)new JobManagerCheckpointStorage(){
            private static final long serialVersionUID = 8134582566514272546L;

            public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
                return new MemoryBackendCheckpointStorageAccess(jobId, null, null, true, 100){

                    public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
                        return new NonPersistentMetadataCheckpointStorageLocation(1000){

                            public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
                                throw new IOException("Artificial Exception");
                            }
                        };
                    }
                };
            }
        }).build(graph);
        final AtomicReference checkpointIdRef = new AtomicReference();
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)new MasterTriggerRestoreHook<Integer>(){

            public String getIdentifier() {
                return "anything";
            }

            @Nullable
            public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
                ((AbstractBooleanAssert)Assertions.assertThat((boolean)coordCheckpointDone.get()).as("The coordinator checkpoint should have finished.", new Object[0])).isTrue();
                checkpointIdRef.set(checkpointId);
                AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1);
                AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
                checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, CheckpointCoordinatorTest.TASK_MANAGER_LOCATION_INFO);
                return null;
            }

            public void restoreCheckpoint(long checkpointId, Integer checkpointData) throws Exception {
            }

            public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
                return new SimpleVersionedSerializer<Integer>(){

                    public int getVersion() {
                        return 0;
                    }

                    public byte[] serialize(Integer obj) {
                        return new byte[0];
                    }

                    public Integer deserialize(int version, byte[] serialized) {
                        return 1;
                    }
                };
            }
        });
        CompletableFuture checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((CompletableFuture)checkpointFuture).isCompletedExceptionally();
        Assertions.assertThat((List)checkpointCoordinator.getSuccessfulCheckpoints()).isEmpty();
    }

    @Test
    void testResetCalledInRegionRecovery() throws Exception {
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        TestResetHook hook = new TestResetHook("id");
        checkpointCoordinator.addMasterHook((MasterTriggerRestoreHook)hook);
        Assertions.assertThat((boolean)hook.resetCalled).isFalse();
        checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(Collections.emptySet());
        Assertions.assertThat((boolean)hook.resetCalled).isTrue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testNotifyCheckpointAbortionInOperatorCoordinator() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex executionVertex = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        ExecutionAttemptID attemptID = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext context = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOperatorID(new OperatorID()).setOnCallingCheckpointCoordinator((ignored, future) -> future.complete(new byte[0])).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(context)).build(graph);
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            long checkpointId1 = (Long)Collections.max(checkpointCoordinator.getPendingCheckpoints().keySet());
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            long checkpointId2 = (Long)Collections.max(checkpointCoordinator.getPendingCheckpoints().keySet());
            AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(graph.getJobID(), attemptID, checkpointId2, new CheckpointMetrics(), null);
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, "");
            Assertions.assertThat(context.getAbortedCheckpoints()).isEqualTo(Collections.singletonList(checkpointId1));
            Assertions.assertThat(context.getCompletedCheckpoints()).isEqualTo(Collections.singletonList(checkpointId2));
        }
        finally {
            checkpointCoordinator.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testTimeoutWhileCheckpointOperatorCoordinatorNotFinishing() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext context = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOperatorID(new OperatorID()).setOnCallingCheckpointCoordinator((ignored, future) -> {}).build();
        ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setCheckpointTimeout(10L).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(context)).build(graph);
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isTrue();
            this.manuallyTriggeredScheduledExecutor.triggerNonPeriodicScheduledTasks();
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assertions.assertThat((boolean)checkpointCoordinator.isTriggering()).isFalse();
        }
        finally {
            checkpointCoordinator.shutdown();
            executorService.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testAbortingBeforeTriggeringCheckpointOperatorCoordinator() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        String trigger = "Trigger";
        String abort = "Abort";
        ArrayList notificationSequence = new ArrayList();
        CheckpointCoordinatorTestingUtils.MockOperatorCoordinatorCheckpointContext context = new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder().setOperatorID(new OperatorID()).setOnCallingCheckpointCoordinator((id, future) -> {
            notificationSequence.add(trigger + id);
            future.complete(new byte[0]);
        }).setOnCallingAbortCurrentTriggering(() -> notificationSequence.add(abort)).build();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setCheckpointTimeout(10L).build()).setIoExecutor((Executor)this.manuallyTriggeredScheduledExecutor).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setCoordinatorsToCheckpoint(Collections.singleton(context)).build(graph);
        try {
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.trigger();
            this.manuallyTriggeredScheduledExecutor.trigger();
            this.manuallyTriggeredScheduledExecutor.trigger();
            this.declineCheckpoint(1L, checkpointCoordinator, jobVertexID, graph);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Preconditions.checkState((!checkpointCoordinator.isTriggering() ? 1 : 0) != 0);
            checkpointCoordinator.triggerCheckpoint(false);
            this.manuallyTriggeredScheduledExecutor.triggerAll();
            Assertions.assertThat((!notificationSequence.contains(trigger + "1") || notificationSequence.indexOf(trigger + "1") < notificationSequence.indexOf(abort) ? 1 : 0) != 0).isTrue();
        }
        finally {
            checkpointCoordinator.shutdown();
        }
    }

    @Test
    void testReportLatestCompletedCheckpointIdWithAbort() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTransitToRunning(false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        ExecutionVertex task = graph.getJobVertex(jobVertexID).getTaskVertices()[0];
        final AtomicLong reportedCheckpointId = new AtomicLong(-1L);
        TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SimpleAckingTaskManagerGateway(){

            @Override
            public void notifyCheckpointAborted(ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long latestCompletedCheckpointId, long timestamp) {
                reportedCheckpointId.set(latestCompletedCheckpointId);
            }
        }).createTestingLogicalSlot();
        ExecutionGraphTestUtils.setVertexResource(task, slot);
        task.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setAllowCheckpointsAfterTasksFinished(true).build(graph);
        CompletableFuture result = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long completedCheckpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), task.getCurrentExecutionAttempt().getAttemptId(), completedCheckpointId, new CheckpointMetrics(), new TaskStateSnapshot()), "localhost");
        Assertions.assertThat((CompletableFuture)result).isDone();
        Assertions.assertThat((CompletableFuture)result).isNotCompletedExceptionally();
        result = checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        long abortedCheckpointId = (Long)checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        checkpointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), task.getCurrentExecutionAttempt().getAttemptId(), abortedCheckpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED)), "localhost");
        Assertions.assertThat((CompletableFuture)result).isCompletedExceptionally();
        Assertions.assertThat((AtomicLong)reportedCheckpointId).hasValue(completedCheckpointId);
    }

    @Test
    void testBaseLocationsNotInitialized() throws Exception {
        File checkpointDir = TempDirUtils.newFolder((java.nio.file.Path)this.tmpFolder);
        JobVertexID jobVertexID = new JobVertexID();
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(jobVertexID).setTransitToRunning(false).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setCheckpointInterval(Long.MAX_VALUE).build()).setCheckpointStorage((CheckpointStorage)new FileSystemCheckpointStorage(checkpointDir.toURI())).build(graph);
        Path jobCheckpointPath = new Path(checkpointDir.getAbsolutePath(), graph.getJobID().toString());
        FileSystem fs = FileSystem.get((URI)checkpointDir.toURI());
        Assertions.assertThat((boolean)fs.exists(jobCheckpointPath)).isFalse();
    }

    private CheckpointCoordinator getCheckpointCoordinator(ExecutionGraph graph) throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder().setAlignedCheckpointTimeout(Long.MAX_VALUE).setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build()).setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).build(graph);
    }

    private CheckpointCoordinator getCheckpointCoordinator(ExecutionGraph graph, CheckpointFailureManager failureManager) throws Exception {
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer((ScheduledExecutor)this.manuallyTriggeredScheduledExecutor).setFailureManager(failureManager).build(graph);
    }

    private CheckpointCoordinator getCheckpointCoordinator(ScheduledExecutor timer) throws Exception {
        ExecutionGraph graph = new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder().addJobVertex(new JobVertexID()).addJobVertex(new JobVertexID()).build((ScheduledExecutorService)EXECUTOR_RESOURCE.getExecutor());
        return new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder().setTimer(timer).build(graph);
    }

    private CheckpointFailureManager getCheckpointFailureManager(final String errorMsg) {
        return new CheckpointFailureManager(0, new CheckpointFailureManager.FailJobCallback(){

            public void failJob(Throwable cause) {
                throw new RuntimeException(errorMsg);
            }

            public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID failingTask) {
                throw new RuntimeException(errorMsg);
            }
        });
    }

    private PendingCheckpoint declineSynchronousSavepoint(JobID jobId, CheckpointCoordinator coordinator, ExecutionAttemptID attemptID, Throwable reason) {
        long checkpointId = (Long)coordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
        PendingCheckpoint checkpoint = (PendingCheckpoint)coordinator.getPendingCheckpoints().get(checkpointId);
        coordinator.receiveDeclineMessage(new DeclineCheckpoint(jobId, attemptID, checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED, reason)), TASK_MANAGER_LOCATION_INFO);
        return checkpoint;
    }

    private void performIncrementalCheckpoint(JobID jobId, CheckpointCoordinator checkpointCoordinator, ExecutionJobVertex jobVertex1, List<KeyGroupRange> keyGroupPartitions1, int cpSequenceNumber) throws Exception {
        checkpointCoordinator.triggerCheckpoint(false);
        this.manuallyTriggeredScheduledExecutor.triggerAll();
        Assertions.assertThat((Map)checkpointCoordinator.getPendingCheckpoints()).hasSize(1);
        long checkpointId = (Long)Iterables.getOnlyElement(checkpointCoordinator.getPendingCheckpoints().keySet());
        for (int index = 0; index < jobVertex1.getParallelism(); ++index) {
            KeyGroupRange keyGroupRange = keyGroupPartitions1.get(index);
            ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> privateState = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>();
            privateState.add(IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)new TestingStreamStateHandle("private-1", new byte[]{112}), (String)"private-1"));
            ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> sharedState = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>();
            if (cpSequenceNumber > 0) {
                sharedState.add(IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)new TestingStreamStateHandle("shared-" + (cpSequenceNumber - 1) + "-" + keyGroupRange, new byte[]{115}), (String)("shared-" + (cpSequenceNumber - 1))));
            }
            sharedState.add(IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)new TestingStreamStateHandle("shared-" + cpSequenceNumber + "-" + keyGroupRange, new byte[]{115}), (String)("shared-" + cpSequenceNumber)));
            IncrementalRemoteKeyedStateHandle managedState = (IncrementalRemoteKeyedStateHandle)Mockito.spy((Object)new IncrementalRemoteKeyedStateHandle(new UUID(42L, 42L), keyGroupRange, checkpointId, sharedState, privateState, (StreamStateHandle)Mockito.spy((Object)new ByteStreamStateHandle("meta", new byte[]{109}))));
            OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState)Mockito.spy((Object)OperatorSubtaskState.builder().setManagedKeyedState((KeyedStateHandle)managedState).build());
            HashMap<OperatorID, OperatorSubtaskState> opStates = new HashMap<OperatorID, OperatorSubtaskState>();
            opStates.put(((OperatorIDPair)jobVertex1.getOperatorIDs().get(0)).getGeneratedOperatorID(), operatorSubtaskState);
            TaskStateSnapshot taskStateSnapshot = new TaskStateSnapshot(opStates);
            AcknowledgeCheckpoint acknowledgeCheckpoint = new AcknowledgeCheckpoint(jobId, jobVertex1.getTaskVertices()[index].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), taskStateSnapshot);
            checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint, TASK_MANAGER_LOCATION_INFO);
        }
    }

    private static void verifyDiscard(List<List<IncrementalKeyedStateHandle.HandleAndLocalPath>> sharedHandles, Function<Integer, TernaryBoolean> checkpointVerify) {
        for (List<IncrementalKeyedStateHandle.HandleAndLocalPath> cpList : sharedHandles) {
            for (IncrementalKeyedStateHandle.HandleAndLocalPath handleAndLocalPath : cpList) {
                String key = handleAndLocalPath.getLocalPath();
                int checkpointID = Integer.parseInt(String.valueOf(key.charAt(key.length() - 1)));
                DiscardRecordedStateObject.verifyDiscard((StateObject)handleAndLocalPath.getHandle(), checkpointVerify.apply(checkpointID));
            }
        }
    }

    private TestingStreamStateHandle handle() {
        return new TestingStreamStateHandle();
    }

    private void declineCheckpoint(long checkpointId, CheckpointCoordinator coordinator, JobVertexID nackVertexID, ExecutionGraph graph) {
        coordinator.receiveDeclineMessage(new DeclineCheckpoint(graph.getJobID(), graph.getJobVertex(nackVertexID).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)), "test");
    }

    private void ackCheckpoint(long checkpointId, CheckpointCoordinator coordinator, JobVertexID ackVertexID, ExecutionGraph graph, TestingStreamStateHandle metaState, TestingStreamStateHandle privateState, TestingStreamStateHandle sharedState) throws CheckpointException {
        ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> sharedStateList = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>(Collections.singletonList(IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)sharedState, (String)"shared-state-key")));
        ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath> privateStateList = new ArrayList<IncrementalKeyedStateHandle.HandleAndLocalPath>(Collections.singletonList(IncrementalKeyedStateHandle.HandleAndLocalPath.of((StreamStateHandle)privateState, (String)"private-state-key")));
        ExecutionJobVertex jobVertex = graph.getJobVertex(ackVertexID);
        OperatorID operatorID = ((OperatorIDPair)jobVertex.getOperatorIDs().get(0)).getGeneratedOperatorID();
        coordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(graph.getJobID(), jobVertex.getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId(), checkpointId, new CheckpointMetrics(), new TaskStateSnapshot(Collections.singletonMap(operatorID, OperatorSubtaskState.builder().setManagedKeyedState((KeyedStateHandle)new IncrementalRemoteKeyedStateHandle(UUID.randomUUID(), KeyGroupRange.of((int)0, (int)9), checkpointId, sharedStateList, privateStateList, (StreamStateHandle)metaState)).build()))), "test");
    }

    private static /* synthetic */ SharedStateRegistry lambda$testExceptionInStoringCompletedCheckpointIsReportedToFailureManager$12(SharedStateRegistry sharedStateRegistry) {
        return sharedStateRegistry;
    }

    static class OperatorSubtaskStateMock {
        OperatorSubtaskState subtaskState;
        TestingOperatorStateHandle managedOpHandle = new TestingOperatorStateHandle();
        TestingOperatorStateHandle rawOpHandle = new TestingOperatorStateHandle();

        OperatorSubtaskStateMock() {
            this.subtaskState = OperatorSubtaskState.builder().setManagedOperatorState((OperatorStateHandle)this.managedOpHandle).setRawOperatorState((OperatorStateHandle)this.rawOpHandle).build();
        }

        public OperatorSubtaskState getSubtaskState() {
            return this.subtaskState;
        }

        public void reset() {
            this.managedOpHandle.reset();
            this.rawOpHandle.reset();
        }

        public void verifyDiscard() {
            assert (this.managedOpHandle.isDiscarded() && this.rawOpHandle.discarded);
        }

        public void verifyNotDiscard() {
            assert (!this.managedOpHandle.isDiscarded() && !this.rawOpHandle.isDiscarded());
        }

        private static class TestingOperatorStateHandle
        implements OperatorStateHandle {
            private static final long serialVersionUID = 983594934287613083L;
            boolean discarded;

            private TestingOperatorStateHandle() {
            }

            public Map<String, OperatorStateHandle.StateMetaInfo> getStateNameToPartitionOffsets() {
                return Collections.emptyMap();
            }

            public FSDataInputStream openInputStream() throws IOException {
                throw new IOException("Cannot open input streams in testing implementation.");
            }

            public PhysicalStateHandleID getStreamStateHandleID() {
                throw new RuntimeException("Cannot return ID in testing implementation.");
            }

            public Optional<byte[]> asBytesIfInMemory() {
                return Optional.empty();
            }

            public StreamStateHandle getDelegateStateHandle() {
                return null;
            }

            public void discardState() throws Exception {
                Assertions.assertThat((boolean)this.discarded).isFalse();
                this.discarded = true;
            }

            public long getStateSize() {
                return 0L;
            }

            public void reset() {
                this.discarded = false;
            }

            public boolean isDiscarded() {
                return this.discarded;
            }
        }
    }

    private static class TestResetHook
    implements MasterTriggerRestoreHook<String> {
        private final String id;
        boolean resetCalled;

        TestResetHook(String id) {
            this.id = id;
            this.resetCalled = false;
        }

        public String getIdentifier() {
            return this.id;
        }

        public void reset() throws Exception {
            this.resetCalled = true;
        }

        public CompletableFuture<String> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) {
            throw new UnsupportedOperationException();
        }

        public void restoreCheckpoint(long checkpointId, @Nullable String checkpointData) throws Exception {
            throw new UnsupportedOperationException();
        }

        public SimpleVersionedSerializer<String> createCheckpointDataSerializer() {
            throw new UnsupportedOperationException();
        }
    }

    private static class TestFailJobCallback
    implements CheckpointFailureManager.FailJobCallback {
        private int invokeCounter = 0;

        private TestFailJobCallback() {
        }

        public void failJob(Throwable cause) {
            ++this.invokeCounter;
        }

        public void failJobDueToTaskFailure(Throwable cause, ExecutionAttemptID executionAttemptID) {
            ++this.invokeCounter;
        }

        public int getInvokeCounter() {
            return this.invokeCounter;
        }
    }

    private static class CheckpointIDCounterWithOwner
    extends StandaloneCheckpointIDCounter {
        protected CheckpointCoordinator owner;

        private CheckpointIDCounterWithOwner() {
        }

        void setOwner(CheckpointCoordinator coordinator) {
            this.owner = (CheckpointCoordinator)Preconditions.checkNotNull((Object)coordinator);
        }
    }

    private static class StoppingCheckpointIDCounter
    extends CheckpointIDCounterWithOwner {
        private StoppingCheckpointIDCounter() {
        }

        public long getAndIncrement() throws Exception {
            Preconditions.checkNotNull((Object)this.owner);
            this.owner.stopCheckpointScheduler();
            return super.getAndIncrement();
        }
    }

    private static class IOExceptionCheckpointStorage
    extends JobManagerCheckpointStorage {
        private IOExceptionCheckpointStorage() {
        }

        public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
            return new MemoryBackendCheckpointStorageAccess(jobId, null, null, true, 100){

                public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
                    throw new IOException("disk is error!");
                }
            };
        }
    }

    private static class IOExceptionCheckpointIDCounter
    extends CheckpointIDCounterWithOwner {
        private IOExceptionCheckpointIDCounter() {
        }

        public long getAndIncrement() throws Exception {
            Preconditions.checkNotNull((Object)this.owner);
            throw new IOException("disk is error!");
        }
    }
}

