/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

public class ExecutionGraphRestartTest {
    private static final int NUM_TASKS = 31;

    @Test
    public void testNotRestartManually() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex sender = new JobVertex("Task");
        sender.setInvokableClass(Tasks.NoOpInvokable.class);
        sender.setParallelism(31);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{sender});
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "test job", new Configuration(), AkkaUtils.getDefaultTimeout());
        eg.setNumberOfRetriesLeft(0);
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).fail((Throwable)new Exception("Test Exception"));
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().cancelingComplete();
        }
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
        eg.restart();
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)eg.getState());
    }

    @Test
    public void testRestartAutomatically() throws Exception {
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        scheduler.newInstanceAvailable(instance);
        JobVertex sender = new JobVertex("Task");
        sender.setInvokableClass(Tasks.NoOpInvokable.class);
        sender.setParallelism(31);
        JobGraph jobGraph = new JobGraph("Pointwise job", new JobVertex[]{sender});
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "Test job", new Configuration(), AkkaUtils.getDefaultTimeout());
        eg.setNumberOfRetriesLeft(1);
        eg.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)eg.getState());
        eg.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        ((ExecutionVertex)eg.getAllExecutionVertices().iterator().next()).fail((Throwable)new Exception("Test Exception"));
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)eg.getState());
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().cancelingComplete();
        }
        FiniteDuration timeout = new FiniteDuration(2L, TimeUnit.MINUTES);
        Deadline deadline = timeout.fromNow();
        while (deadline.hasTimeLeft() && eg.getState() != JobStatus.RUNNING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)eg.getState());
        deadline = timeout.fromNow();
        boolean success = false;
        block2: while (deadline.hasTimeLeft() && !success) {
            success = true;
            for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
                if (vertex.getCurrentExecutionAttempt().getAssignedResource() != null) continue;
                success = false;
                Thread.sleep(100L);
                continue block2;
            }
        }
        if (deadline.hasTimeLeft()) {
            for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
                vertex.getCurrentExecutionAttempt().markFinished();
            }
            Assert.assertEquals((Object)JobStatus.FINISHED, (Object)eg.getState());
        } else {
            Assert.fail((String)"Failed to wait until all execution attempts left the state DEPLOYING.");
        }
    }

    @Test
    public void testCancelWhileRestarting() throws Exception {
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        scheduler.newInstanceAvailable(instance);
        ExecutionGraph executionGraph = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "TestJob", new Configuration(), AkkaUtils.getDefaultTimeout());
        JobVertex jobVertex = new JobVertex("NoOpInvokable");
        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
        jobVertex.setParallelism(31);
        JobGraph jobGraph = new JobGraph("TestJob", new JobVertex[]{jobVertex});
        executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
        executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE);
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)executionGraph.getState());
        executionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)executionGraph.getState());
        instance.markDead();
        Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
        while (deadline.hasTimeLeft() && executionGraph.getState() != JobStatus.RESTARTING) {
            Thread.sleep(100L);
        }
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)executionGraph.getState());
        executionGraph.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
        executionGraph.restart();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
    }

    @Test
    public void testCancelWhileFailing() throws Exception {
        Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
        Instance instance = ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway((ExecutionContext)TestingUtils.directExecutionContext()), 31);
        scheduler.newInstanceAvailable(instance);
        ExecutionGraph executionGraph = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), new JobID(), "TestJob", new Configuration(), AkkaUtils.getDefaultTimeout());
        executionGraph = (ExecutionGraph)Mockito.spy((Object)executionGraph);
        ((ExecutionGraph)Mockito.doNothing().when((Object)executionGraph)).jobVertexInFinalState();
        JobVertex jobVertex = new JobVertex("NoOpInvokable");
        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
        jobVertex.setParallelism(31);
        JobGraph jobGraph = new JobGraph("TestJob", new JobVertex[]{jobVertex});
        executionGraph.setNumberOfRetriesLeft(Integer.MAX_VALUE);
        executionGraph.setDelayBeforeRetrying(Integer.MAX_VALUE);
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        Assert.assertEquals((Object)JobStatus.CREATED, (Object)executionGraph.getState());
        executionGraph.scheduleForExecution(scheduler);
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)executionGraph.getState());
        instance.markDead();
        Deadline deadline = TestingUtils.TESTING_DURATION().fromNow();
        boolean success = false;
        block0: while (deadline.hasTimeLeft() && !success) {
            success = true;
            for (ExecutionVertex vertex : executionGraph.getAllExecutionVertices()) {
                if (vertex.getExecutionState() == ExecutionState.FAILED) continue;
                success = false;
                Thread.sleep(100L);
                continue block0;
            }
        }
        Assert.assertEquals((Object)JobStatus.FAILING, (Object)executionGraph.getState());
        executionGraph.cancel();
        Assert.assertEquals((Object)JobStatus.CANCELLING, (Object)executionGraph.getState());
        ((ExecutionGraph)Mockito.doCallRealMethod().when((Object)executionGraph)).jobVertexInFinalState();
        executionGraph.jobVertexInFinalState();
        Assert.assertEquals((Object)JobStatus.CANCELED, (Object)executionGraph.getState());
    }
}

