/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import javax.annotation.Nonnull;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class ExecutionTest
extends TestLogger {
    @ClassRule
    public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = new TestingComponentMainThreadExecutor.Resource();
    private final TestingComponentMainThreadExecutor testMainThreadUtil = EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();

    @Test
    public void testTerminationFutureIsCompletedAfterSlotRelease() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
        DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider)).build();
        ExecutionJobVertex executionJobVertex = scheduler.getExecutionJobVertex(jobVertexId);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        scheduler.startScheduling();
        Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
        CompletableFuture<TestingPhysicalSlot> returnedSlotFuture = physicalSlotProvider.getFirstResponseOrFail();
        CompletableFuture terminationFuture = executionVertex.cancel();
        currentExecutionAttempt.completeCancelling();
        CompletionStage restartFuture = terminationFuture.thenApply(ignored -> {
            Assert.assertTrue((boolean)returnedSlotFuture.isDone());
            return true;
        });
        ((CompletableFuture)restartFuture).get();
    }

    @Test
    public void testTaskRestoreStateIsNulledAfterDeployment() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), ComponentMainThreadExecutorServiceAdapter.forMainThread()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1))).build();
        ExecutionJobVertex executionJobVertex = scheduler.getExecutionJobVertex(jobVertexId);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        Execution execution = executionVertex.getCurrentExecutionAttempt();
        JobManagerTaskRestore taskRestoreState = new JobManagerTaskRestore(1L, new TaskStateSnapshot());
        execution.setInitialState(taskRestoreState);
        Assert.assertThat((Object)execution.getTaskRestore(), (Matcher)Matchers.is((Matcher)Matchers.notNullValue()));
        scheduler.startScheduling();
        Assert.assertThat((Object)execution.getTaskRestore(), (Matcher)Matchers.is((Matcher)Matchers.nullValue()));
    }

    @Test
    public void testCanceledExecutionReturnsSlot() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        JobVertexID jobVertexId = jobVertex.getID();
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.create(resourceProfile -> CompletableFuture.completedFuture(TestingPhysicalSlot.builder().withTaskManagerGateway(taskManagerGateway).build()));
        DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), this.testMainThreadUtil.getMainThreadExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider)).build();
        ExecutionJobVertex executionJobVertex = scheduler.getExecutionJobVertex(jobVertexId);
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        Execution execution = executionVertex.getCurrentExecutionAttempt();
        taskManagerGateway.setCancelConsumer(executionAttemptID -> {
            if (execution.getAttemptId().equals(executionAttemptID)) {
                execution.completeCancelling();
            }
        });
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((SchedulerBase)scheduler).startScheduling()));
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((Execution)execution).cancel()));
        Assert.assertThat(physicalSlotProvider.getRequests().keySet(), (Matcher)Matchers.is(physicalSlotProvider.getCancellations().keySet()));
    }

    @Test
    public void testSlotReleaseAtomicallyReleasesExecution() throws Exception {
        JobVertex jobVertex = this.createNoOpJobVertex();
        TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1);
        DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(jobVertex), this.testMainThreadUtil.getMainThreadExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider)).build();
        Execution execution = scheduler.getExecutionJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> ((SchedulerBase)scheduler).startScheduling()));
        physicalSlotProvider.awaitAllSlotRequests();
        TestingPhysicalSlot physicalSlot = physicalSlotProvider.getFirstResponseOrFail().get();
        this.testMainThreadUtil.execute((ThrowingRunnable<Throwable>)((ThrowingRunnable)() -> {
            Assert.assertThat((Object)execution.getAssignedAllocationID(), (Matcher)Matchers.is((Object)physicalSlot.getAllocationId()));
            physicalSlot.releasePayload((Throwable)new FlinkException("Test exception"));
            Assert.assertThat((Object)execution.getReleaseFuture().isDone(), (Matcher)Matchers.is((Object)true));
        }));
    }

    @Nonnull
    private JobVertex createNoOpJobVertex() {
        JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID());
        jobVertex.setInvokableClass(NoOpInvokable.class);
        return jobVertex;
    }
}

