/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.MetricRegistryTestUtils;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TestOperatorEvent;
import org.apache.flink.runtime.operators.coordination.TestingCoordinationRequestHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TaskExecutorSubmissionTest;
import org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.testutils.CancelableInvokable;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.TriFunction;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class TaskExecutorOperatorEventHandlingTest {
    @RegisterExtension
    private static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    private MetricRegistryImpl metricRegistry;
    private TestingRpcService rpcService;

    TaskExecutorOperatorEventHandlingTest() {
    }

    @BeforeEach
    void setup() {
        this.rpcService = new TestingRpcService();
        this.metricRegistry = new MetricRegistryImpl(MetricRegistryTestUtils.defaultMetricRegistryConfiguration());
        this.metricRegistry.startQueryService((RpcService)this.rpcService, new ResourceID("mqs"));
    }

    @AfterEach
    void teardown() throws ExecutionException, InterruptedException {
        if (this.rpcService != null) {
            this.rpcService.closeAsync().get();
        }
        if (this.metricRegistry != null) {
            this.metricRegistry.closeAsync().get();
        }
    }

    @Test
    void eventHandlingInTaskFailureFailsTask() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID eid = ExecutionGraphTestUtils.createExecutionAttemptId(new JobVertexID(), 3, 0);
        try (TaskSubmissionTestEnvironment env = this.createExecutorWithRunningTask(jobId, eid, OperatorEventFailingInvokable.class);){
            TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
            CompletableFuture resultFuture = tmGateway.sendOperatorEventToTask(eid, new OperatorID(), new SerializedValue((Object)new TestOperatorEvent()));
            Assertions.assertThat((CompletableFuture)resultFuture).failsWithin(Duration.ofSeconds(10L)).withThrowableOfType(ExecutionException.class).withCauseInstanceOf(FlinkException.class);
            Assertions.assertThat((Comparable)((Task)env.getTaskSlotTable().getTask(eid)).getExecutionState()).isEqualTo((Object)ExecutionState.FAILED);
        }
    }

    @Test
    void eventToCoordinatorDeliveryFailureFailsTask() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID eid = ExecutionGraphTestUtils.createExecutionAttemptId(new JobVertexID(), 3, 0);
        try (TaskSubmissionTestEnvironment env = this.createExecutorWithRunningTask(jobId, eid, OperatorEventSendingInvokable.class);){
            Task task = (Task)env.getTaskSlotTable().getTask(eid);
            task.getExecutingThread().join(10000L);
            Assertions.assertThat((Comparable)task.getExecutionState()).isEqualTo((Object)ExecutionState.FAILED);
        }
    }

    @Test
    void requestToCoordinatorDeliveryFailureFailsTask() throws Exception {
        JobID jobId = new JobID();
        ExecutionAttemptID eid = ExecutionGraphTestUtils.createExecutionAttemptId(new JobVertexID(), 3, 0);
        try (TaskSubmissionTestEnvironment env = this.createExecutorWithRunningTask(jobId, eid, CoordinationRequestSendingInvokable.class);){
            Task task = (Task)env.getTaskSlotTable().getTask(eid);
            task.getExecutingThread().join(10000L);
            Assertions.assertThat((Comparable)task.getExecutionState()).isEqualTo((Object)ExecutionState.FAILED);
        }
    }

    private TaskSubmissionTestEnvironment createExecutorWithRunningTask(JobID jobId, ExecutionAttemptID executionAttemptId, Class<? extends AbstractInvokable> invokableClass) throws Exception {
        TaskDeploymentDescriptor tdd = TaskExecutorOperatorEventHandlingTest.createTaskDeploymentDescriptor(jobId, executionAttemptId, invokableClass);
        CompletableFuture<Void> taskRunningFuture = new CompletableFuture<Void>();
        JobMasterId token = JobMasterId.generate();
        TaskSubmissionTestEnvironment env = new TaskSubmissionTestEnvironment.Builder(jobId).setJobMasterId(token).setSlotSize(1).addTaskManagerActionListener(executionAttemptId, ExecutionState.RUNNING, taskRunningFuture).setMetricQueryServiceAddress(this.metricRegistry.getMetricQueryServiceGatewayRpcAddress()).setJobMasterGateway(new TestingJobMasterGatewayBuilder().setFencingTokenSupplier(() -> token).setOperatorEventSender((TriFunction<ExecutionAttemptID, OperatorID, SerializedValue<OperatorEvent>, CompletableFuture<Acknowledge>>)((TriFunction)(eio, oid, value) -> {
            throw new RuntimeException();
        })).setDeliverCoordinationRequestFunction((oid, value) -> {
            throw new RuntimeException();
        }).build()).build((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor());
        env.getTaskSlotTable().allocateSlot(0, jobId, tdd.getAllocationId(), Duration.ofSeconds(60L));
        TaskExecutorGateway tmGateway = env.getTaskExecutorGateway();
        tmGateway.submitTask(tdd, env.getJobMasterId(), Time.seconds((long)10L)).get();
        taskRunningFuture.get();
        return env;
    }

    private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(JobID jobId, ExecutionAttemptID executionAttemptId, Class<? extends AbstractInvokable> invokableClass) throws IOException {
        return TaskExecutorSubmissionTest.createTaskDeploymentDescriptor(jobId, "test job", executionAttemptId, (SerializedValue<ExecutionConfig>)new SerializedValue((Object)new ExecutionConfig()), "test task", 64, 17, new Configuration(), new Configuration(), invokableClass.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
    }

    public static final class CoordinationRequestSendingInvokable
    extends CancelableInvokable {
        public CoordinationRequestSendingInvokable(Environment environment) {
            super(environment);
        }

        @Override
        protected void doInvoke() throws Exception {
            this.getEnvironment().getOperatorCoordinatorEventGateway().sendRequestToCoordinator(new OperatorID(), new SerializedValue(new TestingCoordinationRequestHandler.Request<Long>(0L)));
            this.waitUntilCancelled();
        }
    }

    public static final class OperatorEventSendingInvokable
    extends CancelableInvokable {
        public OperatorEventSendingInvokable(Environment environment) {
            super(environment);
        }

        @Override
        public void doInvoke() throws Exception {
            this.getEnvironment().getOperatorCoordinatorEventGateway().sendOperatorEventToCoordinator(new OperatorID(), new SerializedValue((Object)new TestOperatorEvent()));
            this.waitUntilCancelled();
        }
    }

    public static final class OperatorEventFailingInvokable
    extends CancelableInvokable {
        public OperatorEventFailingInvokable(Environment environment) {
            super(environment);
        }

        @Override
        public void doInvoke() throws InterruptedException {
            this.waitUntilCancelled();
        }

        public void dispatchOperatorEvent(OperatorID operator, SerializedValue<OperatorEvent> event) throws FlinkException {
            throw new FlinkException("test exception");
        }
    }
}

