/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint;

import java.io.Serializable;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class CheckpointCoordinatorTest {
    private static final ClassLoader cl = Thread.currentThread().getContextClassLoader();

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionVertex triggerVertex1 = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
            ExecutionVertex triggerVertex2 = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID2);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, new ExecutionVertex[]{triggerVertex1, triggerVertex2}, new ExecutionVertex[]{ackVertex1, ackVertex2}, new ExecutionVertex[0], cl, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertFalse((boolean)coord.triggerCheckpoint(timestamp));
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            coord.shutdown();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCheckpointAbortsIfTriggerTasksAreFinished() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex triggerVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID1);
            ExecutionVertex triggerVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID2, ExecutionState.FINISHED);
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID2);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, new ExecutionVertex[]{triggerVertex1, triggerVertex2}, new ExecutionVertex[]{ackVertex1, ackVertex2}, new ExecutionVertex[0], cl, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertFalse((boolean)coord.triggerCheckpoint(timestamp));
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            coord.shutdown();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCheckpointAbortsIfAckTasksAreNotExecuted() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
            ExecutionVertex triggerVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID1);
            ExecutionVertex triggerVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID2);
            ExecutionVertex ackVertex1 = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
            ExecutionVertex ackVertex2 = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, new ExecutionVertex[]{triggerVertex1, triggerVertex2}, new ExecutionVertex[]{ackVertex1, ackVertex2}, new ExecutionVertex[0], cl, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertFalse((boolean)coord.triggerCheckpoint(timestamp));
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            coord.shutdown();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testTriggerAndConfirmSimpleCheckpoint() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
            ExecutionVertex vertex1 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID1);
            ExecutionVertex vertex2 = CheckpointCoordinatorTest.mockExecutionVertex(attemptID2);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, new ExecutionVertex[]{vertex1, vertex2}, cl, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(1, cl), RecoveryMode.STANDALONE);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp));
            Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            long checkpointId = (Long)coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
            PendingCheckpoint checkpoint = (PendingCheckpoint)coord.getPendingCheckpoints().get(checkpointId);
            Assert.assertNotNull((Object)checkpoint);
            Assert.assertEquals((long)checkpointId, (long)checkpoint.getCheckpointId());
            Assert.assertEquals((long)timestamp, (long)checkpoint.getCheckpointTimestamp());
            Assert.assertEquals((Object)jid, (Object)checkpoint.getJobId());
            Assert.assertEquals((long)2L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)0L, (long)checkpoint.getCollectedStates().size());
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.isFullyAcknowledged());
            TriggerCheckpoint expectedMessage1 = new TriggerCheckpoint(jid, attemptID1, checkpointId, timestamp);
            TriggerCheckpoint expectedMessage2 = new TriggerCheckpoint(jid, attemptID2, checkpointId, timestamp);
            ((ExecutionVertex)Mockito.verify((Object)vertex1, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)Mockito.eq((Object)expectedMessage1), (ExecutionAttemptID)Mockito.eq((Object)attemptID1));
            ((ExecutionVertex)Mockito.verify((Object)vertex2, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)Mockito.eq((Object)expectedMessage2), (ExecutionAttemptID)Mockito.eq((Object)attemptID2));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
            Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfAcknowledgedTasks());
            Assert.assertEquals((long)1L, (long)checkpoint.getNumberOfNonAcknowledgedTasks());
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.isFullyAcknowledged());
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointId));
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            Assert.assertFalse((boolean)checkpoint.isFullyAcknowledged());
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointId));
            Assert.assertTrue((boolean)checkpoint.isDiscarded());
            Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            NotifyCheckpointComplete confirmMessage1 = new NotifyCheckpointComplete(jid, attemptID1, checkpointId, timestamp);
            NotifyCheckpointComplete confirmMessage2 = new NotifyCheckpointComplete(jid, attemptID2, checkpointId, timestamp);
            ((ExecutionVertex)Mockito.verify((Object)vertex1, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)Mockito.eq((Object)confirmMessage1), (ExecutionAttemptID)Mockito.eq((Object)attemptID1));
            ((ExecutionVertex)Mockito.verify((Object)vertex2, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)Mockito.eq((Object)confirmMessage2), (ExecutionAttemptID)Mockito.eq((Object)attemptID2));
            CompletedCheckpoint success = (CompletedCheckpoint)coord.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals((Object)jid, (Object)success.getJobId());
            Assert.assertEquals((long)timestamp, (long)success.getTimestamp());
            Assert.assertEquals((long)checkpoint.getCheckpointId(), (long)success.getCheckpointID());
            Assert.assertTrue((boolean)success.getStates().isEmpty());
            long timestampNew = timestamp + 7L;
            coord.triggerCheckpoint(timestampNew);
            long checkpointIdNew = (Long)coord.getPendingCheckpoints().entrySet().iterator().next().getKey();
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID1, checkpointIdNew));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, attemptID2, checkpointIdNew));
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            CompletedCheckpoint successNew = (CompletedCheckpoint)coord.getSuccessfulCheckpoints().get(0);
            Assert.assertEquals((Object)jid, (Object)successNew.getJobId());
            Assert.assertEquals((long)timestampNew, (long)successNew.getTimestamp());
            Assert.assertEquals((long)checkpointIdNew, (long)successNew.getCheckpointID());
            Assert.assertTrue((boolean)successNew.getStates().isEmpty());
            TriggerCheckpoint expectedMessage12 = new TriggerCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew);
            TriggerCheckpoint expectedMessage22 = new TriggerCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew);
            ((ExecutionVertex)Mockito.verify((Object)vertex1, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)Mockito.eq((Object)expectedMessage12), (ExecutionAttemptID)Mockito.eq((Object)attemptID1));
            ((ExecutionVertex)Mockito.verify((Object)vertex2, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)Mockito.eq((Object)expectedMessage22), (ExecutionAttemptID)Mockito.eq((Object)attemptID2));
            NotifyCheckpointComplete confirmMessage12 = new NotifyCheckpointComplete(jid, attemptID1, checkpointIdNew, timestampNew);
            NotifyCheckpointComplete confirmMessage22 = new NotifyCheckpointComplete(jid, attemptID2, checkpointIdNew, timestampNew);
            ((ExecutionVertex)Mockito.verify((Object)vertex1, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)Mockito.eq((Object)confirmMessage12), (ExecutionAttemptID)Mockito.eq((Object)attemptID1));
            ((ExecutionVertex)Mockito.verify((Object)vertex2, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)Mockito.eq((Object)confirmMessage22), (ExecutionAttemptID)Mockito.eq((Object)attemptID2));
            coord.shutdown();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMultipleConcurrentCheckpoints() {
        try {
            JobID jid = new JobID();
            long timestamp1 = System.currentTimeMillis();
            long timestamp2 = timestamp1 + 8617L;
            ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID1);
            ExecutionVertex triggerVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID2);
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID2);
            ExecutionVertex ackVertex3 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID3);
            ExecutionVertex commitVertex = CheckpointCoordinatorTest.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, new ExecutionVertex[]{triggerVertex1, triggerVertex2}, new ExecutionVertex[]{ackVertex1, ackVertex2, ackVertex3}, new ExecutionVertex[]{commitVertex}, cl, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp1));
            Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            PendingCheckpoint pending1 = (PendingCheckpoint)coord.getPendingCheckpoints().values().iterator().next();
            long checkpointId1 = pending1.getCheckpointId();
            ((ExecutionVertex)Mockito.verify((Object)triggerVertex1, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId1, timestamp1), triggerAttemptID1);
            ((ExecutionVertex)Mockito.verify((Object)triggerVertex2, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp2));
            Assert.assertEquals((long)2L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Iterator all = coord.getPendingCheckpoints().values().iterator();
            PendingCheckpoint cc1 = (PendingCheckpoint)all.next();
            PendingCheckpoint cc2 = (PendingCheckpoint)all.next();
            PendingCheckpoint pending2 = pending1 == cc1 ? cc2 : cc1;
            long checkpointId2 = pending2.getCheckpointId();
            ((ExecutionVertex)Mockito.verify((Object)triggerVertex1, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId2, timestamp2), triggerAttemptID1);
            ((ExecutionVertex)Mockito.verify((Object)triggerVertex2, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId2, timestamp2), triggerAttemptID2);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
            Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue((boolean)pending1.isDiscarded());
            ((ExecutionVertex)Mockito.verify((Object)commitVertex, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId1, timestamp1), commitAttemptID);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)2L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue((boolean)pending2.isDiscarded());
            ((ExecutionVertex)Mockito.verify((Object)commitVertex, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
            List scs = coord.getSuccessfulCheckpoints();
            CompletedCheckpoint sc1 = (CompletedCheckpoint)scs.get(0);
            Assert.assertEquals((long)checkpointId1, (long)sc1.getCheckpointID());
            Assert.assertEquals((long)timestamp1, (long)sc1.getTimestamp());
            Assert.assertEquals((Object)jid, (Object)sc1.getJobId());
            Assert.assertTrue((boolean)sc1.getStates().isEmpty());
            CompletedCheckpoint sc2 = (CompletedCheckpoint)scs.get(1);
            Assert.assertEquals((long)checkpointId2, (long)sc2.getCheckpointID());
            Assert.assertEquals((long)timestamp2, (long)sc2.getTimestamp());
            Assert.assertEquals((Object)jid, (Object)sc2.getJobId());
            Assert.assertTrue((boolean)sc2.getStates().isEmpty());
            coord.shutdown();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testSuccessfulCheckpointSubsumesUnsuccessful() {
        try {
            JobID jid = new JobID();
            long timestamp1 = System.currentTimeMillis();
            long timestamp2 = timestamp1 + 1552L;
            ExecutionAttemptID triggerAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID triggerAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID3 = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID1);
            ExecutionVertex triggerVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID2);
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID2);
            ExecutionVertex ackVertex3 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID3);
            ExecutionVertex commitVertex = CheckpointCoordinatorTest.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 600000L, new ExecutionVertex[]{triggerVertex1, triggerVertex2}, new ExecutionVertex[]{ackVertex1, ackVertex2, ackVertex3}, new ExecutionVertex[]{commitVertex}, cl, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(10, cl), RecoveryMode.STANDALONE);
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp1));
            Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            PendingCheckpoint pending1 = (PendingCheckpoint)coord.getPendingCheckpoints().values().iterator().next();
            long checkpointId1 = pending1.getCheckpointId();
            ((ExecutionVertex)Mockito.verify((Object)triggerVertex1, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId1, timestamp1), triggerAttemptID1);
            ((ExecutionVertex)Mockito.verify((Object)triggerVertex2, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId1, timestamp1), triggerAttemptID2);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId1));
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp2));
            Assert.assertEquals((long)2L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            Iterator all = coord.getPendingCheckpoints().values().iterator();
            PendingCheckpoint cc1 = (PendingCheckpoint)all.next();
            PendingCheckpoint cc2 = (PendingCheckpoint)all.next();
            PendingCheckpoint pending2 = pending1 == cc1 ? cc2 : cc1;
            long checkpointId2 = pending2.getCheckpointId();
            ((ExecutionVertex)Mockito.verify((Object)triggerVertex1, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)new TriggerCheckpoint(jid, triggerAttemptID1, checkpointId2, timestamp2), triggerAttemptID1);
            ((ExecutionVertex)Mockito.verify((Object)triggerVertex2, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)new TriggerCheckpoint(jid, triggerAttemptID2, checkpointId2, timestamp2), triggerAttemptID2);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId2));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpointId1));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID2, checkpointId2));
            Assert.assertTrue((boolean)pending1.isDiscarded());
            Assert.assertTrue((boolean)pending2.isDiscarded());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)1L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            List scs = coord.getSuccessfulCheckpoints();
            CompletedCheckpoint success = (CompletedCheckpoint)scs.get(0);
            Assert.assertEquals((long)checkpointId2, (long)success.getCheckpointID());
            Assert.assertEquals((long)timestamp2, (long)success.getTimestamp());
            Assert.assertEquals((Object)jid, (Object)success.getJobId());
            Assert.assertTrue((boolean)success.getStates().isEmpty());
            ((ExecutionVertex)Mockito.verify((Object)commitVertex, (VerificationMode)Mockito.times((int)1))).sendMessageToCurrentExecution((Serializable)new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID);
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1));
            coord.shutdown();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCheckpointTimeoutIsolated() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID);
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID2);
            ExecutionVertex commitVertex = CheckpointCoordinatorTest.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200L, new ExecutionVertex[]{triggerVertex}, new ExecutionVertex[]{ackVertex1, ackVertex2}, new ExecutionVertex[]{commitVertex}, cl, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp));
            Assert.assertEquals((long)1L, (long)coord.getNumberOfPendingCheckpoints());
            PendingCheckpoint checkpoint = (PendingCheckpoint)coord.getPendingCheckpoints().values().iterator().next();
            Assert.assertFalse((boolean)checkpoint.isDiscarded());
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, checkpoint.getCheckpointId()));
            long deadline = System.currentTimeMillis() + 5000L;
            do {
                Thread.sleep(250L);
            } while (!checkpoint.isDiscarded() && coord.getNumberOfPendingCheckpoints() > 0 && System.currentTimeMillis() < deadline);
            Assert.assertTrue((String)"Checkpoint was not canceled by the timeout", (boolean)checkpoint.isDiscarded());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfPendingCheckpoints());
            Assert.assertEquals((long)0L, (long)coord.getNumberOfRetainedSuccessfulCheckpoints());
            ((ExecutionVertex)Mockito.verify((Object)commitVertex, (VerificationMode)Mockito.times((int)0))).sendMessageToCurrentExecution((Serializable)Mockito.any(NotifyCheckpointComplete.class), (ExecutionAttemptID)Mockito.any(ExecutionAttemptID.class));
            coord.shutdown();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void handleMessagesForNonExistingCheckpoints() {
        try {
            JobID jid = new JobID();
            long timestamp = System.currentTimeMillis();
            ExecutionAttemptID triggerAttemptID = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID1 = new ExecutionAttemptID();
            ExecutionAttemptID ackAttemptID2 = new ExecutionAttemptID();
            ExecutionAttemptID commitAttemptID = new ExecutionAttemptID();
            ExecutionVertex triggerVertex = CheckpointCoordinatorTest.mockExecutionVertex(triggerAttemptID);
            ExecutionVertex ackVertex1 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID1);
            ExecutionVertex ackVertex2 = CheckpointCoordinatorTest.mockExecutionVertex(ackAttemptID2);
            ExecutionVertex commitVertex = CheckpointCoordinatorTest.mockExecutionVertex(commitAttemptID);
            CheckpointCoordinator coord = new CheckpointCoordinator(jid, 200000L, new ExecutionVertex[]{triggerVertex}, new ExecutionVertex[]{ackVertex1, ackVertex2}, new ExecutionVertex[]{commitVertex}, cl, (CheckpointIDCounter)new StandaloneCheckpointIDCounter(), (CompletedCheckpointStore)new StandaloneCompletedCheckpointStore(2, cl), RecoveryMode.STANDALONE);
            Assert.assertTrue((boolean)coord.triggerCheckpoint(timestamp));
            long checkpointId = (Long)coord.getPendingCheckpoints().keySet().iterator().next();
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(new JobID(), ackAttemptID1, checkpointId));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID1, 1L));
            coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, new ExecutionAttemptID(), checkpointId));
            coord.shutdown();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID) {
        return CheckpointCoordinatorTest.mockExecutionVertex(attemptID, ExecutionState.RUNNING);
    }

    private static ExecutionVertex mockExecutionVertex(ExecutionAttemptID attemptID, ExecutionState state) {
        Execution exec = (Execution)Mockito.mock(Execution.class);
        Mockito.when((Object)exec.getAttemptId()).thenReturn((Object)attemptID);
        Mockito.when((Object)exec.getState()).thenReturn((Object)state);
        ExecutionVertex vertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        Mockito.when((Object)vertex.getJobvertexId()).thenReturn((Object)new JobVertexID());
        Mockito.when((Object)vertex.getCurrentExecutionAttempt()).thenReturn((Object)exec);
        return vertex;
    }
}

