/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.io.IOException;
import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteChannelStateChecker;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
import org.apache.flink.runtime.shuffle.PartitionDescriptor;
import org.apache.flink.runtime.shuffle.PartitionDescriptorBuilder;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
import org.apache.flink.runtime.taskmanager.TestingTaskManagerActions;
import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.WrappingRuntimeException;
import org.apache.flink.util.concurrent.Executors;
import org.apache.flink.util.function.BiFunctionWithException;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class TaskTest
extends TestLogger {
    private static final String RESTORE_EXCEPTION_MSG = "TestExceptionInRestore";
    private static OneShotLatch awaitLatch;
    private static OneShotLatch triggerLatch;
    private ShuffleEnvironment<?, ?> shuffleEnvironment;
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER;
    private static boolean wasCleanedUp;

    @Before
    public void setup() {
        awaitLatch = new OneShotLatch();
        triggerLatch = new OneShotLatch();
        this.shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
        wasCleanedUp = false;
    }

    @After
    public void teardown() throws Exception {
        if (this.shuffleEnvironment != null) {
            this.shuffleEnvironment.close();
        }
    }

    @Test
    public void testCleanupWhenRestoreFails() throws Exception {
        this.createTaskBuilder().setInvokable(InvokableWithExceptionInRestore.class).build().run();
        Assert.assertTrue((boolean)wasCleanedUp);
    }

    @Test
    public void testCleanupWhenInvokeFails() throws Exception {
        this.createTaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).build().run();
        Assert.assertTrue((boolean)wasCleanedUp);
    }

    @Test
    public void testCleanupWhenCancelledAfterRestore() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInRestore.class).build();
        task.startTaskThread();
        awaitLatch.await();
        task.cancelExecution();
        task.getExecutingThread().join();
        Assert.assertTrue((boolean)wasCleanedUp);
    }

    @Test
    public void testCleanupWhenAfterInvokeSucceeded() throws Exception {
        this.createTaskBuilder().setInvokable(TestInvokableCorrect.class).build().run();
        Assert.assertTrue((boolean)wasCleanedUp);
    }

    @Test
    public void testRegularExecution() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(TestInvokableCorrect.class).setTaskManagerActions(taskManagerActions).build();
        Assert.assertEquals((Object)ExecutionState.CREATED, (Object)task.getExecutionState());
        Assert.assertFalse((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)task.getExecutionState());
        Assert.assertFalse((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        Assert.assertNull((Object)task.getInvokable());
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FINISHED, task, null);
    }

    @Test
    public void testCancelRightAway() throws Exception {
        Task task = this.createTaskBuilder().build();
        task.cancelExecution();
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
        task.run();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertNull((Object)task.getInvokable());
    }

    @Test
    public void testFailExternallyRightAway() throws Exception {
        Task task = this.createTaskBuilder().build();
        task.failExternally((Throwable)new Exception("fail externally"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
    }

    @Test
    public void testLibraryCacheRegistrationFailed() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        IOException testException = new IOException("Could not load classloader");
        Task task = this.createTaskBuilder().setTaskManagerActions(taskManagerActions).setClassLoaderHandle((LibraryCacheManager.ClassLoaderHandle)TestingClassLoaderLease.newBuilder().setGetOrResolveClassLoaderFunction((BiFunctionWithException<Collection<PermanentBlobKey>, Collection<URL>, UserCodeClassLoader, IOException>)((BiFunctionWithException)(permanentBlobKeys, urls) -> {
            throw testException;
        })).build()).build();
        Assert.assertEquals((Object)ExecutionState.CREATED, (Object)task.getExecutionState());
        Assert.assertFalse((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertThat((Object)task.getFailureCause(), (Matcher)CoreMatchers.is((Object)testException));
        Assert.assertNull((Object)task.getInvokable());
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, testException);
    }

    @Test
    public void testExecutionFailsInNetworkRegistrationForPartitions() throws Exception {
        PartitionDescriptor partitionDescriptor = PartitionDescriptorBuilder.newBuilder().build();
        NettyShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
        ResultPartitionDeploymentDescriptor dummyPartition = new ResultPartitionDeploymentDescriptor(partitionDescriptor, (ShuffleDescriptor)shuffleDescriptor, 1, false);
        this.testExecutionFailsInNetworkRegistration(Collections.singletonList(dummyPartition), Collections.emptyList());
    }

    @Test
    public void testExecutionFailsInNetworkRegistrationForGates() throws Exception {
        NettyShuffleDescriptor dummyChannel = NettyShuffleDescriptorBuilder.newBuilder().buildRemote();
        InputGateDeploymentDescriptor dummyGate = new InputGateDeploymentDescriptor(new IntermediateDataSetID(), ResultPartitionType.PIPELINED, 0, new ShuffleDescriptor[]{dummyChannel});
        this.testExecutionFailsInNetworkRegistration(Collections.emptyList(), Collections.singletonList(dummyGate));
    }

    private void testExecutionFailsInNetworkRegistration(List<ResultPartitionDeploymentDescriptor> resultPartitions, List<InputGateDeploymentDescriptor> inputGates) throws Exception {
        String errorMessage = "Network buffer pool has already been destroyed.";
        NoOpResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
        PartitionProducerStateChecker partitionProducerStateChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = new TestTaskBuilder(this.shuffleEnvironment).setTaskManagerActions(taskManagerActions).setConsumableNotifier(consumableNotifier).setPartitionProducerStateChecker(partitionProducerStateChecker).setResultPartitions(resultPartitions).setInputGates(inputGates).build();
        this.shuffleEnvironment.close();
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("Network buffer pool has already been destroyed."));
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new IllegalStateException("Network buffer pool has already been destroyed."));
    }

    @Test
    public void testInvokableInstantiationFailed() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setTaskManagerActions(taskManagerActions).setInvokable(InvokableNonInstantiable.class).build();
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("instantiate"));
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new FlinkException("Could not instantiate the task's invokable class."));
    }

    @Test
    public void testExecutionFailsInRestore() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableWithExceptionInRestore.class).setTaskManagerActions(taskManagerActions).build();
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNotNull((Object)task.getFailureCause());
        Assert.assertNotNull((Object)task.getFailureCause().getMessage());
        Assert.assertThat((Object)task.getFailureCause().getMessage(), (Matcher)CoreMatchers.containsString((String)RESTORE_EXCEPTION_MSG));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception(RESTORE_EXCEPTION_MSG));
    }

    @Test
    public void testExecutionFailsInInvoke() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).setTaskManagerActions(taskManagerActions).build();
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNotNull((Object)task.getFailureCause());
        Assert.assertNotNull((Object)task.getFailureCause().getMessage());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception("test"));
    }

    @Test
    public void testFailWithWrappedException() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(FailingInvokableWithChainedException.class).setTaskManagerActions(taskManagerActions).build();
        task.run();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Throwable cause = task.getFailureCause();
        Assert.assertTrue((boolean)(cause instanceof IOException));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new IOException("test"));
    }

    @Test
    public void testCancelDuringRestore() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInRestore.class).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.cancelExecution();
        Assert.assertTrue((task.getExecutionState() == ExecutionState.CANCELING || task.getExecutionState() == ExecutionState.CANCELED ? 1 : 0) != 0);
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.CANCELED, task, null);
    }

    @Test
    public void testCancelDuringInvoke() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.cancelExecution();
        Assert.assertTrue((task.getExecutionState() == ExecutionState.CANCELING || task.getExecutionState() == ExecutionState.CANCELED ? 1 : 0) != 0);
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.CANCELED, task, null);
    }

    @Test
    public void testFailExternallyDuringRestore() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInRestore.class).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.failExternally((Throwable)new Exception(RESTORE_EXCEPTION_MSG));
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertThat((Object)task.getFailureCause().getMessage(), (Matcher)CoreMatchers.containsString((String)RESTORE_EXCEPTION_MSG));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception(RESTORE_EXCEPTION_MSG));
    }

    @Test
    public void testFailExternallyDuringInvoke() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.failExternally((Throwable)new Exception("test"));
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception("test"));
    }

    @Test
    public void testCanceledAfterExecutionFailedInInvoke() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).setTaskManagerActions(taskManagerActions).build();
        task.run();
        task.cancelExecution();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception("test"));
    }

    @Test
    public void testExecutionFailsAfterCanceling() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableWithExceptionOnTrigger.class).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.cancelExecution();
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
        triggerLatch.trigger();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertNull((Object)task.getFailureCause());
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.CANCELED, task, null);
    }

    @Test
    public void testExecutionFailsAfterTaskMarkedFailed() throws Exception {
        QueuedNoOpTaskManagerActions taskManagerActions = new QueuedNoOpTaskManagerActions();
        Task task = this.createTaskBuilder().setInvokable(InvokableWithExceptionOnTrigger.class).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.failExternally((Throwable)new Exception("external"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        triggerLatch.trigger();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("external"));
        taskManagerActions.validateListenerMessage(ExecutionState.INITIALIZING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.RUNNING, task, null);
        taskManagerActions.validateListenerMessage(ExecutionState.FAILED, task, new Exception("external"));
    }

    @Test
    public void testCancelTaskException() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(InvokableWithCancelTaskExceptionInInvoke.class).build();
        triggerLatch.trigger();
        task.run();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(InvokableWithCancelTaskExceptionInInvoke.class).build();
        task.startTaskThread();
        awaitLatch.await();
        task.failExternally((Throwable)new Exception("external"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        triggerLatch.trigger();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("external"));
    }

    @Test
    public void testOnPartitionStateUpdateWhileRunning() throws Exception {
        this.testOnPartitionStateUpdate(ExecutionState.RUNNING);
    }

    @Test
    public void testOnPartitionStateUpdateWhileDeploying() throws Exception {
        this.testOnPartitionStateUpdate(ExecutionState.DEPLOYING);
    }

    public void testOnPartitionStateUpdate(ExecutionState initialTaskState) throws Exception {
        ResultPartitionID partitionId = new ResultPartitionID();
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).build();
        RemoteChannelStateChecker checker = new RemoteChannelStateChecker(partitionId, "test task");
        HashMap<ExecutionState, ExecutionState> expected = new HashMap<ExecutionState, ExecutionState>(ExecutionState.values().length);
        for (ExecutionState state : ExecutionState.values()) {
            expected.put(state, ExecutionState.FAILED);
        }
        expected.put(ExecutionState.INITIALIZING, initialTaskState);
        expected.put(ExecutionState.RUNNING, initialTaskState);
        expected.put(ExecutionState.SCHEDULED, initialTaskState);
        expected.put(ExecutionState.DEPLOYING, initialTaskState);
        expected.put(ExecutionState.FINISHED, initialTaskState);
        expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
        expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING);
        expected.put(ExecutionState.FAILED, ExecutionState.CANCELING);
        int producingStateCounter = 0;
        for (ExecutionState state : ExecutionState.values()) {
            TestTaskBuilder.setTaskState(task, initialTaskState);
            Task task2 = task;
            task2.getClass();
            if (checker.isProducerReadyOrAbortConsumption((PartitionProducerStateProvider.ResponseHandle)new Task.PartitionProducerStateResponseHandle(task2, state, null))) {
                ++producingStateCounter;
            }
            ExecutionState newTaskState = task.getExecutionState();
            Assert.assertEquals(expected.get(state), (Object)newTaskState);
        }
        Assert.assertEquals((long)5L, (long)producingStateCounter);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTriggerPartitionStateUpdate() throws Exception {
        IntermediateDataSetID resultId = new IntermediateDataSetID();
        ResultPartitionID partitionId = new ResultPartitionID();
        PartitionProducerStateChecker partitionChecker = (PartitionProducerStateChecker)Mockito.mock(PartitionProducerStateChecker.class);
        NoOpResultPartitionConsumableNotifier consumableNotifier = new NoOpResultPartitionConsumableNotifier();
        AtomicInteger callCount = new AtomicInteger(0);
        RemoteChannelStateChecker remoteChannelStateChecker = new RemoteChannelStateChecker(partitionId, "test task");
        this.setup();
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setConsumableNotifier(consumableNotifier).setPartitionProducerStateChecker(partitionChecker).setExecutor(Executors.directExecutor()).build();
        TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
        CompletableFuture<ExecutionState> promise = new CompletableFuture<ExecutionState>();
        Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)ArgumentMatchers.eq((Object)task.getJobID()), (IntermediateDataSetID)ArgumentMatchers.eq((Object)resultId), (ResultPartitionID)ArgumentMatchers.eq((Object)partitionId))).thenReturn(promise);
        task.requestPartitionProducerState(resultId, partitionId, checkResult -> Assert.assertThat((Object)remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult), (Matcher)CoreMatchers.is((Object)false)));
        promise.completeExceptionally((Throwable)new PartitionProducerDisposedException(partitionId));
        Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
        this.setup();
        task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setConsumableNotifier(consumableNotifier).setPartitionProducerStateChecker(partitionChecker).setExecutor(Executors.directExecutor()).build();
        TestTaskBuilder.setTaskState(task, ExecutionState.RUNNING);
        promise = new CompletableFuture();
        Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)ArgumentMatchers.eq((Object)task.getJobID()), (IntermediateDataSetID)ArgumentMatchers.eq((Object)resultId), (ResultPartitionID)ArgumentMatchers.eq((Object)partitionId))).thenReturn(promise);
        task.requestPartitionProducerState(resultId, partitionId, checkResult -> Assert.assertThat((Object)remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult), (Matcher)CoreMatchers.is((Object)false)));
        promise.completeExceptionally(new RuntimeException("Any other exception"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        callCount.set(0);
        this.setup();
        task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setConsumableNotifier(consumableNotifier).setPartitionProducerStateChecker(partitionChecker).setExecutor(Executors.directExecutor()).build();
        try {
            task.startTaskThread();
            awaitLatch.await();
            promise = new CompletableFuture();
            Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)ArgumentMatchers.eq((Object)task.getJobID()), (IntermediateDataSetID)ArgumentMatchers.eq((Object)resultId), (ResultPartitionID)ArgumentMatchers.eq((Object)partitionId))).thenReturn(promise);
            task.requestPartitionProducerState(resultId, partitionId, checkResult -> {
                if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) {
                    callCount.incrementAndGet();
                }
            });
            promise.completeExceptionally(new TimeoutException());
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
            Assert.assertEquals((long)1L, (long)callCount.get());
        }
        finally {
            task.getExecutingThread().interrupt();
            task.getExecutingThread().join();
        }
        callCount.set(0);
        this.setup();
        task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setConsumableNotifier(consumableNotifier).setPartitionProducerStateChecker(partitionChecker).setExecutor(Executors.directExecutor()).build();
        try {
            task.startTaskThread();
            awaitLatch.await();
            promise = new CompletableFuture();
            Mockito.when((Object)partitionChecker.requestPartitionProducerState((JobID)ArgumentMatchers.eq((Object)task.getJobID()), (IntermediateDataSetID)ArgumentMatchers.eq((Object)resultId), (ResultPartitionID)ArgumentMatchers.eq((Object)partitionId))).thenReturn(promise);
            task.requestPartitionProducerState(resultId, partitionId, checkResult -> {
                if (remoteChannelStateChecker.isProducerReadyOrAbortConsumption(checkResult)) {
                    callCount.incrementAndGet();
                }
            });
            promise.complete(ExecutionState.RUNNING);
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
            Assert.assertEquals((long)1L, (long)callCount.get());
        }
        finally {
            task.getExecutingThread().interrupt();
            task.getExecutingThread().join();
        }
    }

    @Test
    public void testWatchDogInterruptsTask() throws Exception {
        ProhibitFatalErrorTaskManagerActions taskManagerActions = new ProhibitFatalErrorTaskManagerActions();
        Configuration config = new Configuration();
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5L);
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 60000L);
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInCancel.class).setTaskManagerConfig(config).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.cancelExecution();
        task.getExecutingThread().join();
    }

    @Test
    public void testInterruptibleSharedLockInInvokeAndCancel() throws Exception {
        ProhibitFatalErrorTaskManagerActions taskManagerActions = new ProhibitFatalErrorTaskManagerActions();
        Configuration config = new Configuration();
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5L);
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50L);
        Task task = this.createTaskBuilder().setInvokable(InvokableInterruptibleSharedLockInInvokeAndCancel.class).setTaskManagerConfig(config).setTaskManagerActions(taskManagerActions).build();
        task.startTaskThread();
        awaitLatch.await();
        task.cancelExecution();
        task.getExecutingThread().join();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFatalErrorAfterUnInterruptibleInvoke() throws Exception {
        CompletableFuture fatalErrorFuture = new CompletableFuture();
        TestingTaskManagerActions taskManagerActions = TestingTaskManagerActions.newBuilder().setNotifyFatalErrorConsumer((s, throwable) -> fatalErrorFuture.complete(throwable)).build();
        Configuration config = new Configuration();
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 10L);
        Task task = this.createTaskBuilder().setInvokable(InvokableUnInterruptibleBlockingInvoke.class).setTaskManagerConfig(config).setTaskManagerActions(taskManagerActions).build();
        try {
            task.startTaskThread();
            awaitLatch.await();
            task.cancelExecution();
            Throwable fatalError = (Throwable)fatalErrorFuture.join();
            Assert.assertThat((Object)fatalError, (Matcher)CoreMatchers.is((Matcher)CoreMatchers.notNullValue()));
        }
        finally {
            triggerLatch.trigger();
            task.getExecutingThread().interrupt();
            task.getExecutingThread().join();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFatalErrorOnCanceling() throws Exception {
        CompletableFuture fatalErrorFuture = new CompletableFuture();
        TestingTaskManagerActions taskManagerActions = TestingTaskManagerActions.newBuilder().setNotifyFatalErrorConsumer((s, throwable) -> fatalErrorFuture.complete(throwable)).build();
        Configuration config = new Configuration();
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, 5L);
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 50L);
        Task task = (Task)Mockito.spy((Object)this.createTaskBuilder().setInvokable(InvokableBlockingWithTrigger.class).setTaskManagerConfig(config).setTaskManagerActions(taskManagerActions).build());
        Class<OutOfMemoryError> fatalErrorType = OutOfMemoryError.class;
        ((Task)Mockito.doThrow(fatalErrorType).when((Object)task)).cancelOrFailAndCancelInvokableInternal((ExecutionState)ArgumentMatchers.eq((Object)ExecutionState.CANCELING), (Throwable)ArgumentMatchers.eq(null));
        try {
            task.startTaskThread();
            awaitLatch.await();
            task.cancelExecution();
            Throwable fatalError = (Throwable)fatalErrorFuture.join();
            Assert.assertThat((Object)fatalError, (Matcher)CoreMatchers.instanceOf(fatalErrorType));
        }
        finally {
            triggerLatch.trigger();
        }
    }

    @Test
    public void testTaskConfig() throws Exception {
        long interval = 28218123L;
        long timeout = interval + 19292L;
        Configuration config = new Configuration();
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, interval);
        config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, timeout);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setTaskCancellationInterval(interval + 1337L);
        executionConfig.setTaskCancellationTimeout(timeout - 1337L);
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerConfig(config).setExecutionConfig(executionConfig).build();
        Assert.assertEquals((long)interval, (long)task.getTaskCancellationInterval());
        Assert.assertEquals((long)timeout, (long)task.getTaskCancellationTimeout());
        task.startTaskThread();
        awaitLatch.await();
        Assert.assertEquals((long)executionConfig.getTaskCancellationInterval(), (long)task.getTaskCancellationInterval());
        Assert.assertEquals((long)executionConfig.getTaskCancellationTimeout(), (long)task.getTaskCancellationTimeout());
        task.getExecutingThread().interrupt();
        task.getExecutingThread().join();
    }

    @Test
    public void testTerminationFutureCompletesOnNormalExecution() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingWithTrigger.class).setTaskManagerActions(new NoOpTaskManagerActions()).build();
        task.startTaskThread();
        awaitLatch.await();
        Assert.assertFalse((boolean)task.getTerminationFuture().isDone());
        triggerLatch.trigger();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FINISHED, task.getTerminationFuture().getNow(null));
    }

    @Test
    public void testTerminationFutureCompletesOnImmediateCancellation() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(InvokableBlockingInInvoke.class).setTaskManagerActions(new NoOpTaskManagerActions()).build();
        task.cancelExecution();
        Assert.assertFalse((boolean)task.getTerminationFuture().isDone());
        task.startTaskThread();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.CANCELED, task.getTerminationFuture().getNow(null));
    }

    @Test
    public void testTerminationFutureCompletesOnErrorInInvoke() throws Exception {
        Task task = this.createTaskBuilder().setInvokable(InvokableWithExceptionInInvoke.class).setTaskManagerActions(new NoOpTaskManagerActions()).build();
        task.startTaskThread();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, task.getTerminationFuture().getNow(null));
    }

    @Test
    public void testNoBackPressureIfTaskNotStarted() throws Exception {
        Task task = this.createTaskBuilder().build();
        Assert.assertFalse((boolean)task.isBackPressured());
    }

    @Test
    public void testDeclineCheckpoint() throws Exception {
        TestCheckpointResponder testCheckpointResponder = new TestCheckpointResponder();
        Task task = this.createTaskBuilder().setInvokable(InvokableDeclingingCheckpoints.class).setCheckpointResponder(testCheckpointResponder).build();
        this.assertCheckpointDeclined(task, testCheckpointResponder, 1L, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY);
        task.startTaskThread();
        try {
            awaitLatch.await();
            Assert.assertEquals((Object)ExecutionState.RUNNING, (Object)task.getExecutionState());
            this.assertCheckpointDeclined(task, testCheckpointResponder, 2L, CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_CLOSING);
            this.assertCheckpointDeclined(task, testCheckpointResponder, 3L, CheckpointFailureReason.TASK_FAILURE);
            this.assertCheckpointDeclined(task, testCheckpointResponder, 4L, CheckpointFailureReason.TASK_FAILURE);
        }
        finally {
            triggerLatch.trigger();
            task.getExecutingThread().join();
        }
        Assert.assertEquals((Object)ExecutionState.FINISHED, task.getTerminationFuture().getNow(null));
    }

    private void assertCheckpointDeclined(Task task, TestCheckpointResponder testCheckpointResponder, long checkpointId, CheckpointFailureReason failureReason) {
        CheckpointOptions checkpointOptions = CheckpointOptions.alignedNoTimeout((CheckpointType)CheckpointType.CHECKPOINT, (CheckpointStorageLocationReference)CheckpointStorageLocationReference.getDefault());
        task.triggerCheckpointBarrier(checkpointId, 1L, checkpointOptions);
        Assert.assertEquals((long)1L, (long)testCheckpointResponder.getDeclineReports().size());
        Assert.assertEquals((long)checkpointId, (long)testCheckpointResponder.getDeclineReports().get(0).getCheckpointId());
        Assert.assertEquals((Object)failureReason, (Object)testCheckpointResponder.getDeclineReports().get(0).getCause().getCheckpointFailureReason());
        testCheckpointResponder.clear();
    }

    private TestTaskBuilder createTaskBuilder() {
        return new TestTaskBuilder(this.shuffleEnvironment);
    }

    static {
        TEMPORARY_FOLDER = new TemporaryFolder();
        wasCleanedUp = false;
    }

    private static class TestWrappedException
    extends WrappingRuntimeException {
        private static final long serialVersionUID = 1L;

        TestWrappedException(@Nonnull Throwable cause) {
            super(cause);
        }
    }

    public static final class InvokableUnInterruptibleBlockingInvoke
    extends AbstractInvokable {
        public InvokableUnInterruptibleBlockingInvoke(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() {
            while (!triggerLatch.isTriggered()) {
                try {
                    InvokableUnInterruptibleBlockingInvoke invokableUnInterruptibleBlockingInvoke = this;
                    synchronized (invokableUnInterruptibleBlockingInvoke) {
                        awaitLatch.trigger();
                        ((Object)((Object)this)).wait();
                    }
                }
                catch (InterruptedException interruptedException) {
                }
            }
        }

        public Future<Void> cancel() {
            return CompletableFuture.completedFuture(null);
        }
    }

    public static final class InvokableInterruptibleSharedLockInInvokeAndCancel
    extends AbstractInvokable {
        private final Object lock = new Object();

        public InvokableInterruptibleSharedLockInInvokeAndCancel(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            Object object = this.lock;
            synchronized (object) {
                awaitLatch.trigger();
                ((Object)((Object)this)).wait();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Future<Void> cancel() {
            Object object = this.lock;
            synchronized (object) {
                triggerLatch.trigger();
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    public static final class InvokableBlockingInCancel
    extends AbstractInvokable {
        public InvokableBlockingInCancel(Environment environment) {
            super(environment);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() {
            awaitLatch.trigger();
            try {
                triggerLatch.await();
                InvokableBlockingInCancel invokableBlockingInCancel = this;
                synchronized (invokableBlockingInCancel) {
                    ((Object)((Object)this)).wait();
                }
            }
            catch (InterruptedException ignored) {
                InvokableBlockingInCancel invokableBlockingInCancel = this;
                synchronized (invokableBlockingInCancel) {
                    ((Object)((Object)this)).notifyAll();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Future<Void> cancel() throws Exception {
            InvokableBlockingInCancel invokableBlockingInCancel = this;
            synchronized (invokableBlockingInCancel) {
                triggerLatch.trigger();
                ((Object)((Object)this)).wait();
            }
            return CompletableFuture.completedFuture(null);
        }
    }

    public static final class InvokableWithCancelTaskExceptionInInvoke
    extends AbstractInvokable {
        public InvokableWithCancelTaskExceptionInInvoke(Environment environment) {
            super(environment);
        }

        public void invoke() {
            awaitLatch.trigger();
            try {
                triggerLatch.await();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            throw new CancelTaskException();
        }
    }

    public static final class InvokableWithExceptionOnTrigger
    extends AbstractInvokable {
        public InvokableWithExceptionOnTrigger(Environment environment) {
            super(environment);
        }

        public void invoke() {
            awaitLatch.trigger();
            while (true) {
                try {
                    triggerLatch.await();
                }
                catch (InterruptedException interruptedException) {
                    continue;
                }
                break;
            }
            throw new RuntimeException("test");
        }
    }

    private static final class InvokableBlockingInRestore
    extends AbstractInvokable {
        public InvokableBlockingInRestore(Environment environment) {
            super(environment);
        }

        public void restore() throws Exception {
            awaitLatch.trigger();
            InvokableBlockingInRestore invokableBlockingInRestore = this;
            synchronized (invokableBlockingInRestore) {
                while (true) {
                    ((Object)((Object)this)).wait();
                }
            }
        }

        public void invoke() throws Exception {
        }

        public void cleanUp(Throwable throwable) throws Exception {
            wasCleanedUp = true;
            super.cleanUp(throwable);
        }
    }

    private static final class InvokableBlockingInInvoke
    extends AbstractInvokable {
        public InvokableBlockingInInvoke(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            awaitLatch.trigger();
            InvokableBlockingInInvoke invokableBlockingInInvoke = this;
            synchronized (invokableBlockingInInvoke) {
                while (true) {
                    ((Object)((Object)this)).wait();
                }
            }
        }
    }

    private static class InvokableDeclingingCheckpoints
    extends InvokableBlockingWithTrigger {
        public static final int REJECTED_EXECUTION_CHECKPOINT_ID = 2;
        public static final int THROWING_CHECKPOINT_ID = 3;
        public static final int TRIGGERING_FAILED_CHECKPOINT_ID = 4;

        public InvokableDeclingingCheckpoints(Environment environment) {
            super(environment);
        }

        public CompletableFuture<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            long checkpointId = checkpointMetaData.getCheckpointId();
            switch (Math.toIntExact(checkpointId)) {
                case 2: {
                    throw new RejectedExecutionException();
                }
                case 3: {
                    CompletableFuture<Boolean> result = new CompletableFuture<Boolean>();
                    result.completeExceptionally(new ExpectedTestException());
                    return result;
                }
                case 4: {
                    return CompletableFuture.completedFuture(false);
                }
            }
            throw new UnsupportedOperationException("Unsupported checkpointId: " + checkpointId);
        }
    }

    private static class InvokableBlockingWithTrigger
    extends AbstractInvokable {
        public InvokableBlockingWithTrigger(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            awaitLatch.trigger();
            triggerLatch.await();
        }
    }

    private static final class FailingInvokableWithChainedException
    extends AbstractInvokable {
        public FailingInvokableWithChainedException(Environment environment) {
            super(environment);
        }

        public void invoke() {
            throw new TestWrappedException(new IOException("test"));
        }

        public Future<Void> cancel() {
            return CompletableFuture.completedFuture(null);
        }
    }

    static final class InvokableWithExceptionInRestore
    extends AbstractInvokable {
        public InvokableWithExceptionInRestore(Environment environment) {
            super(environment);
        }

        public void restore() throws Exception {
            throw new Exception(TaskTest.RESTORE_EXCEPTION_MSG);
        }

        public void invoke() throws Exception {
        }

        public void cleanUp(Throwable throwable) throws Exception {
            wasCleanedUp = true;
            super.cleanUp(throwable);
        }
    }

    private static final class InvokableWithExceptionInInvoke
    extends AbstractInvokable {
        public InvokableWithExceptionInInvoke(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            throw new Exception("test");
        }

        public void cleanUp(Throwable throwable) throws Exception {
            wasCleanedUp = true;
            super.cleanUp(throwable);
        }
    }

    private static abstract class InvokableNonInstantiable
    extends AbstractInvokable {
        public InvokableNonInstantiable(Environment environment) {
            super(environment);
        }
    }

    private static final class TestInvokableCorrect
    extends AbstractInvokable {
        public TestInvokableCorrect(Environment environment) {
            super(environment);
        }

        public void invoke() {
        }

        public Future<Void> cancel() {
            Assert.fail((String)"This should not be called");
            return null;
        }

        public void cleanUp(Throwable throwable) throws Exception {
            wasCleanedUp = true;
            super.cleanUp(throwable);
        }
    }

    private static class ProhibitFatalErrorTaskManagerActions
    extends NoOpTaskManagerActions {
        private ProhibitFatalErrorTaskManagerActions() {
        }

        @Override
        public void notifyFatalError(String message, Throwable cause) {
            throw new RuntimeException("Unexpected FatalError notification");
        }
    }

    private static class QueuedNoOpTaskManagerActions
    extends NoOpTaskManagerActions {
        private final BlockingQueue<TaskExecutionState> queue = new LinkedBlockingDeque<TaskExecutionState>();

        private QueuedNoOpTaskManagerActions() {
        }

        @Override
        public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
            this.queue.offer(taskExecutionState);
        }

        private void validateListenerMessage(ExecutionState state, Task task, Throwable error) {
            try {
                TaskExecutionState taskState = this.queue.take();
                Assert.assertNotNull((String)"There is no additional listener message", (Object)state);
                Assert.assertEquals((Object)task.getExecutionId(), (Object)taskState.getID());
                Assert.assertEquals((Object)state, (Object)taskState.getExecutionState());
                Throwable t = taskState.getError(this.getClass().getClassLoader());
                if (error == null) {
                    Assert.assertNull((Object)t);
                } else {
                    Assert.assertEquals((Object)error.toString(), (Object)t.toString());
                }
            }
            catch (InterruptedException e) {
                Assert.fail((String)"interrupted");
            }
        }
    }
}

