/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class ExecutionVertexDeploymentTest {
    private static final String ERROR_MESSAGE = "test_failure_error_message";

    ExecutionVertexDeploymentTest() {
    }

    @Test
    void testDeployCall() {
        try {
            ExecutionVertex vertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CREATED);
            vertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
            vertex.deployToSlot((LogicalSlot)slot);
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.DEPLOYING);
            try {
                vertex.deployToSlot((LogicalSlot)slot);
                Assertions.fail((String)"Scheduled from wrong state");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assertions.assertThat((Optional)vertex.getFailureInfo()).isNotPresent();
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.DEPLOYING)).isGreaterThan(0L);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testDeployWithSynchronousAnswer() {
        try {
            ExecutionVertex vertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CREATED);
            vertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
            vertex.deployToSlot((LogicalSlot)slot);
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.DEPLOYING);
            try {
                vertex.deployToSlot((LogicalSlot)slot);
                Assertions.fail((String)"Scheduled from wrong state");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assertions.assertThat((Optional)vertex.getFailureInfo()).isNotPresent();
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.DEPLOYING)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.RUNNING)).isZero();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testDeployWithAsynchronousAnswer() {
        try {
            ExecutionVertex vertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().createTestingLogicalSlot();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CREATED);
            vertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
            vertex.deployToSlot((LogicalSlot)slot);
            try {
                vertex.deployToSlot((LogicalSlot)slot);
                Assertions.fail((String)"Scheduled from wrong state");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.DEPLOYING);
            try {
                vertex.deployToSlot((LogicalSlot)slot);
                Assertions.fail((String)"Scheduled from wrong state");
            }
            catch (IllegalStateException illegalStateException) {
                // empty catch block
            }
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.DEPLOYING)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.RUNNING)).isZero();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testDeployFailedSynchronous() {
        try {
            ExecutionVertex vertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitFailingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CREATED);
            vertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
            vertex.deployToSlot((LogicalSlot)slot);
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.FAILED);
            Assertions.assertThat((Optional)vertex.getFailureInfo()).isPresent();
            Assertions.assertThat((String)vertex.getFailureInfo().map(ErrorInfo::getExceptionAsString).get()).contains(new CharSequence[]{ERROR_MESSAGE});
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.DEPLOYING)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.FAILED)).isGreaterThan(0L);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testDeployFailedAsynchronously() {
        try {
            ExecutionVertex vertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitFailingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CREATED);
            vertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
            vertex.deployToSlot((LogicalSlot)slot);
            for (int i = 0; !(i >= 100 || vertex.getExecutionState() == ExecutionState.FAILED && vertex.getFailureInfo().isPresent()); ++i) {
                Thread.sleep(10L);
            }
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.FAILED);
            Assertions.assertThat((Optional)vertex.getFailureInfo()).isPresent();
            Assertions.assertThat((String)vertex.getFailureInfo().map(ErrorInfo::getExceptionAsString).get()).contains(new CharSequence[]{ERROR_MESSAGE});
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.DEPLOYING)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.FAILED)).isGreaterThan(0L);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testFailExternallyDuringDeploy() {
        try {
            ExecutionVertex vertex = ExecutionGraphTestUtils.getExecutionVertex();
            TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlotBuilder().setTaskManagerGateway(new SubmitBlockingSimpleAckingTaskManagerGateway()).createTestingLogicalSlot();
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CREATED);
            vertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
            vertex.deployToSlot((LogicalSlot)testingLogicalSlot);
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.DEPLOYING);
            Exception testError = new Exception("test error");
            vertex.fail((Throwable)testError);
            Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.FAILED);
            Assertions.assertThat((Throwable)vertex.getFailureInfo().map(ErrorInfo::getException).get().deserializeError(ClassLoader.getSystemClassLoader())).isEqualTo((Object)testError);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.CREATED)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.DEPLOYING)).isGreaterThan(0L);
            Assertions.assertThat((long)vertex.getStateTimestamp(ExecutionState.FAILED)).isGreaterThan(0L);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    private static class SubmitBlockingSimpleAckingTaskManagerGateway
    extends SimpleAckingTaskManagerGateway {
        private SubmitBlockingSimpleAckingTaskManagerGateway() {
        }

        @Override
        public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
            return new CompletableFuture<Acknowledge>();
        }
    }

    public static class SubmitFailingSimpleAckingTaskManagerGateway
    extends SimpleAckingTaskManagerGateway {
        @Override
        public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, Time timeout) {
            CompletableFuture<Acknowledge> future = new CompletableFuture<Acknowledge>();
            future.completeExceptionally(new Exception(ExecutionVertexDeploymentTest.ERROR_MESSAGE));
            return future;
        }
    }
}

