/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.taskmanager.ForwardingActorGateway;
import org.apache.flink.runtime.taskmanager.OneShotLatch;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.shaded.com.google.common.collect.Maps;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import scala.concurrent.duration.FiniteDuration;

public class TaskTest {
    private static OneShotLatch awaitLatch;
    private static OneShotLatch triggerLatch;
    private ActorGateway taskManagerGateway;
    private ActorGateway jobManagerGateway;
    private ActorGateway listenerGateway;
    private BlockingQueue<Object> taskManagerMessages;
    private BlockingQueue<Object> jobManagerMessages;
    private BlockingQueue<Object> listenerMessages;

    @Before
    public void createQueuesAndActors() {
        this.taskManagerMessages = new LinkedBlockingQueue<Object>();
        this.jobManagerMessages = new LinkedBlockingQueue<Object>();
        this.listenerMessages = new LinkedBlockingQueue<Object>();
        this.taskManagerGateway = new ForwardingActorGateway(this.taskManagerMessages);
        this.jobManagerGateway = new ForwardingActorGateway(this.jobManagerMessages);
        this.listenerGateway = new ForwardingActorGateway(this.listenerMessages);
        awaitLatch = new OneShotLatch();
        triggerLatch = new OneShotLatch();
    }

    @After
    public void clearActorsAndMessages() {
        this.jobManagerMessages = null;
        this.taskManagerMessages = null;
        this.listenerMessages = null;
        this.taskManagerGateway = null;
        this.jobManagerGateway = null;
        this.listenerGateway = null;
    }

    @Test
    public void testRegularExecution() {
        try {
            Task task = this.createTask(TestInvokableCorrect.class);
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)task.getExecutionState());
            Assert.assertFalse((boolean)task.isCanceledOrFailed());
            Assert.assertNull((Object)task.getFailureCause());
            task.registerExecutionListener(this.listenerGateway);
            task.run();
            Assert.assertEquals((Object)ExecutionState.FINISHED, (Object)task.getExecutionState());
            Assert.assertFalse((boolean)task.isCanceledOrFailed());
            Assert.assertNull((Object)task.getFailureCause());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateListenerMessage(ExecutionState.FINISHED, task, false);
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelRightAway() {
        try {
            Task task = this.createTask(TestInvokableCorrect.class);
            task.cancelExecution();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
            task.run();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
            this.validateUnregisterTask(task.getExecutionId());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFailExternallyRightAway() {
        try {
            Task task = this.createTask(TestInvokableCorrect.class);
            task.failExternally((Throwable)new Exception("fail externally"));
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            task.run();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            this.validateUnregisterTask(task.getExecutionId());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testLibraryCacheRegistrationFailed() {
        try {
            Task task = this.createTask(TestInvokableCorrect.class, (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class));
            Assert.assertEquals((Object)ExecutionState.CREATED, (Object)task.getExecutionState());
            Assert.assertFalse((boolean)task.isCanceledOrFailed());
            Assert.assertNull((Object)task.getFailureCause());
            task.registerExecutionListener(this.listenerGateway);
            task.run();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertNotNull((Object)task.getFailureCause());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("classloader"));
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
            this.validateUnregisterTask(task.getExecutionId());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testExecutionFailsInNetworkRegistration() {
        try {
            LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
            Mockito.when((Object)libCache.getClassLoader((JobID)Matchers.any(JobID.class))).thenReturn((Object)this.getClass().getClassLoader());
            ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
            ResultPartitionConsumableNotifier consumableNotifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
            NetworkEnvironment network = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
            Mockito.when((Object)network.getPartitionManager()).thenReturn((Object)partitionManager);
            Mockito.when((Object)network.getPartitionConsumableNotifier()).thenReturn((Object)consumableNotifier);
            Mockito.when((Object)network.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
            ((NetworkEnvironment)Mockito.doThrow((Throwable)new RuntimeException("buffers")).when((Object)network)).registerTask((Task)Matchers.any(Task.class));
            Task task = this.createTask(TestInvokableCorrect.class, libCache, network);
            task.registerExecutionListener(this.listenerGateway);
            task.run();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("buffers"));
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testInvokableInstantiationFailed() {
        try {
            Task task = this.createTask(InvokableNonInstantiable.class);
            task.registerExecutionListener(this.listenerGateway);
            task.run();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("instantiate"));
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testExecutionFailsInRegisterInputOutput() {
        try {
            Task task = this.createTask(InvokableWithExceptionInRegisterInOut.class);
            task.registerExecutionListener(this.listenerGateway);
            task.run();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("registerInputOutput"));
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testExecutionFailsInInvoke() {
        try {
            Task task = this.createTask(InvokableWithExceptionInInvoke.class);
            task.registerExecutionListener(this.listenerGateway);
            task.run();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelDuringRegisterInputOutput() {
        try {
            Task task = this.createTask(InvokableBlockingInRegisterInOut.class);
            task.registerExecutionListener(this.listenerGateway);
            task.startTaskThread();
            awaitLatch.await();
            task.cancelExecution();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
            triggerLatch.trigger();
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertNull((Object)task.getFailureCause());
            this.validateUnregisterTask(task.getExecutionId());
            this.validateCancelingAndCanceledListenerMessage(task);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFailDuringRegisterInputOutput() {
        try {
            Task task = this.createTask(InvokableBlockingInRegisterInOut.class);
            task.registerExecutionListener(this.listenerGateway);
            task.startTaskThread();
            awaitLatch.await();
            task.failExternally((Throwable)new Exception("test"));
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            triggerLatch.trigger();
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelDuringInvoke() {
        try {
            Task task = this.createTask(InvokableBlockingInInvoke.class);
            task.registerExecutionListener(this.listenerGateway);
            task.startTaskThread();
            awaitLatch.await();
            task.cancelExecution();
            Assert.assertTrue((task.getExecutionState() == ExecutionState.CANCELING || task.getExecutionState() == ExecutionState.CANCELED ? 1 : 0) != 0);
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertNull((Object)task.getFailureCause());
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateCancelingAndCanceledListenerMessage(task);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testFailExternallyDuringInvoke() {
        try {
            Task task = this.createTask(InvokableBlockingInInvoke.class);
            task.registerExecutionListener(this.listenerGateway);
            task.startTaskThread();
            awaitLatch.await();
            task.failExternally((Throwable)new Exception("test"));
            Assert.assertTrue((task.getExecutionState() == ExecutionState.FAILED ? 1 : 0) != 0);
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCanceledAfterExecutionFailedInRegInOut() {
        try {
            Task task = this.createTask(InvokableWithExceptionInRegisterInOut.class);
            task.registerExecutionListener(this.listenerGateway);
            task.run();
            task.cancelExecution();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("registerInputOutput"));
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCanceledAfterExecutionFailedInInvoke() {
        try {
            Task task = this.createTask(InvokableWithExceptionInInvoke.class);
            task.registerExecutionListener(this.listenerGateway);
            task.run();
            task.cancelExecution();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("test"));
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testExecutionFailesAfterCanceling() {
        try {
            Task task = this.createTask(InvokableWithExceptionOnTrigger.class);
            task.registerExecutionListener(this.listenerGateway);
            task.startTaskThread();
            awaitLatch.await();
            task.cancelExecution();
            Assert.assertEquals((Object)ExecutionState.CANCELING, (Object)task.getExecutionState());
            triggerLatch.trigger();
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertNull((Object)task.getFailureCause());
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateCancelingAndCanceledListenerMessage(task);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testExecutionFailsAfterTaskMarkedFailed() {
        try {
            Task task = this.createTask(InvokableWithExceptionOnTrigger.class);
            task.registerExecutionListener(this.listenerGateway);
            task.startTaskThread();
            awaitLatch.await();
            task.failExternally((Throwable)new Exception("external"));
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            triggerLatch.trigger();
            task.getExecutingThread().join();
            Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
            Assert.assertTrue((boolean)task.isCanceledOrFailed());
            Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("external"));
            this.validateTaskManagerStateChange(ExecutionState.RUNNING, task, false);
            this.validateUnregisterTask(task.getExecutionId());
            this.validateListenerMessage(ExecutionState.RUNNING, task, false);
            this.validateListenerMessage(ExecutionState.FAILED, task, true);
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCancelTaskException() throws Exception {
        Task task = this.createTask(InvokableWithCancelTaskExceptionInInvoke.class);
        triggerLatch.trigger();
        task.run();
        Assert.assertEquals((Object)ExecutionState.CANCELED, (Object)task.getExecutionState());
    }

    @Test
    public void testCancelTaskExceptionAfterTaskMarkedFailed() throws Exception {
        Task task = this.createTask(InvokableWithCancelTaskExceptionInInvoke.class);
        task.startTaskThread();
        awaitLatch.await();
        task.failExternally((Throwable)new Exception("external"));
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        triggerLatch.trigger();
        task.getExecutingThread().join();
        Assert.assertEquals((Object)ExecutionState.FAILED, (Object)task.getExecutionState());
        Assert.assertTrue((boolean)task.isCanceledOrFailed());
        Assert.assertTrue((boolean)task.getFailureCause().getMessage().contains("external"));
    }

    @Test
    public void testOnPartitionStateUpdate() throws Exception {
        IntermediateDataSetID resultId = new IntermediateDataSetID();
        ResultPartitionID partitionId = new ResultPartitionID();
        SingleInputGate inputGate = (SingleInputGate)Mockito.mock(SingleInputGate.class);
        Mockito.when((Object)inputGate.getConsumedResultId()).thenReturn((Object)resultId);
        Task task = this.createTask(InvokableBlockingInInvoke.class);
        this.setInputGate(task, inputGate);
        HashMap expected = Maps.newHashMapWithExpectedSize((int)ExecutionState.values().length);
        for (ExecutionState state : ExecutionState.values()) {
            expected.put(state, ExecutionState.FAILED);
        }
        expected.put(ExecutionState.RUNNING, ExecutionState.RUNNING);
        expected.put(ExecutionState.CANCELED, ExecutionState.CANCELING);
        expected.put(ExecutionState.CANCELING, ExecutionState.CANCELING);
        expected.put(ExecutionState.FAILED, ExecutionState.CANCELING);
        for (ExecutionState state : ExecutionState.values()) {
            this.setState(task, ExecutionState.RUNNING);
            task.onPartitionStateUpdate(resultId, partitionId.getPartitionId(), state);
            ExecutionState newTaskState = task.getExecutionState();
            Assert.assertEquals(expected.get(state), (Object)newTaskState);
        }
        ((SingleInputGate)Mockito.verify((Object)inputGate, (VerificationMode)Mockito.times((int)1))).retriggerPartitionRequest((IntermediateResultPartitionID)Matchers.eq((Object)partitionId.getPartitionId()));
    }

    private void setInputGate(Task task, SingleInputGate inputGate) {
        try {
            Field f = Task.class.getDeclaredField("inputGates");
            f.setAccessible(true);
            f.set(task, new SingleInputGate[]{inputGate});
            HashMap byId = Maps.newHashMapWithExpectedSize((int)1);
            byId.put(inputGate.getConsumedResultId(), inputGate);
            f = Task.class.getDeclaredField("inputGatesById");
            f.setAccessible(true);
            f.set(task, byId);
        }
        catch (Exception e) {
            throw new RuntimeException("Modifying the task state failed", e);
        }
    }

    private void setState(Task task, ExecutionState state) {
        try {
            Field f = Task.class.getDeclaredField("executionState");
            f.setAccessible(true);
            f.set(task, state);
        }
        catch (Exception e) {
            throw new RuntimeException("Modifying the task state failed", e);
        }
    }

    private Task createTask(Class<? extends AbstractInvokable> invokable) {
        LibraryCacheManager libCache = (LibraryCacheManager)Mockito.mock(LibraryCacheManager.class);
        Mockito.when((Object)libCache.getClassLoader((JobID)Matchers.any(JobID.class))).thenReturn((Object)this.getClass().getClassLoader());
        return this.createTask(invokable, libCache);
    }

    private Task createTask(Class<? extends AbstractInvokable> invokable, LibraryCacheManager libCache) {
        ResultPartitionManager partitionManager = (ResultPartitionManager)Mockito.mock(ResultPartitionManager.class);
        ResultPartitionConsumableNotifier consumableNotifier = (ResultPartitionConsumableNotifier)Mockito.mock(ResultPartitionConsumableNotifier.class);
        NetworkEnvironment network = (NetworkEnvironment)Mockito.mock(NetworkEnvironment.class);
        Mockito.when((Object)network.getPartitionManager()).thenReturn((Object)partitionManager);
        Mockito.when((Object)network.getPartitionConsumableNotifier()).thenReturn((Object)consumableNotifier);
        Mockito.when((Object)network.getDefaultIOMode()).thenReturn((Object)IOManager.IOMode.SYNC);
        return this.createTask(invokable, libCache, network);
    }

    private Task createTask(Class<? extends AbstractInvokable> invokable, LibraryCacheManager libCache, NetworkEnvironment networkEnvironment) {
        TaskDeploymentDescriptor tdd = this.createTaskDeploymentDescriptor(invokable);
        return new Task(tdd, (MemoryManager)Mockito.mock(MemoryManager.class), (IOManager)Mockito.mock(IOManager.class), networkEnvironment, (BroadcastVariableManager)Mockito.mock(BroadcastVariableManager.class), this.taskManagerGateway, this.jobManagerGateway, new FiniteDuration(60L, TimeUnit.SECONDS), libCache, (FileCache)Mockito.mock(FileCache.class), new TaskManagerRuntimeInfo("localhost", new Configuration()));
    }

    private TaskDeploymentDescriptor createTaskDeploymentDescriptor(Class<? extends AbstractInvokable> invokable) {
        return new TaskDeploymentDescriptor(new JobID(), new JobVertexID(), new ExecutionAttemptID(), "Test Task", 0, 1, new Configuration(), new Configuration(), invokable.getName(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0);
    }

    private void validateUnregisterTask(ExecutionAttemptID id) {
        try {
            Object rawMessage = this.taskManagerMessages.take();
            Assert.assertNotNull((String)"There is no additional TaskManager message", (Object)rawMessage);
            if (!(rawMessage instanceof TaskMessages.TaskInFinalState)) {
                Assert.fail((String)("TaskManager message is not 'UnregisterTask', but " + rawMessage.getClass()));
            }
            TaskMessages.TaskInFinalState message = (TaskMessages.TaskInFinalState)rawMessage;
            Assert.assertEquals((Object)id, (Object)message.executionID());
        }
        catch (InterruptedException e) {
            Assert.fail((String)"interrupted");
        }
    }

    private void validateTaskManagerStateChange(ExecutionState state, Task task, boolean hasError) {
        try {
            Object rawMessage = this.taskManagerMessages.take();
            Assert.assertNotNull((String)"There is no additional TaskManager message", (Object)rawMessage);
            if (!(rawMessage instanceof TaskMessages.UpdateTaskExecutionState)) {
                Assert.fail((String)("TaskManager message is not 'UpdateTaskExecutionState', but " + rawMessage.getClass()));
            }
            TaskMessages.UpdateTaskExecutionState message = (TaskMessages.UpdateTaskExecutionState)rawMessage;
            TaskExecutionState taskState = message.taskExecutionState();
            Assert.assertEquals((Object)task.getJobID(), (Object)taskState.getJobID());
            Assert.assertEquals((Object)task.getExecutionId(), (Object)taskState.getID());
            Assert.assertEquals((Object)state, (Object)taskState.getExecutionState());
            if (hasError) {
                Assert.assertNotNull((Object)taskState.getError(this.getClass().getClassLoader()));
            } else {
                Assert.assertNull((Object)taskState.getError(this.getClass().getClassLoader()));
            }
        }
        catch (InterruptedException e) {
            Assert.fail((String)"interrupted");
        }
    }

    private void validateListenerMessage(ExecutionState state, Task task, boolean hasError) {
        try {
            TaskMessages.UpdateTaskExecutionState message = (TaskMessages.UpdateTaskExecutionState)this.listenerMessages.take();
            Assert.assertNotNull((String)"There is no additional listener message", (Object)message);
            TaskExecutionState taskState = message.taskExecutionState();
            Assert.assertEquals((Object)task.getJobID(), (Object)taskState.getJobID());
            Assert.assertEquals((Object)task.getExecutionId(), (Object)taskState.getID());
            Assert.assertEquals((Object)state, (Object)taskState.getExecutionState());
            if (hasError) {
                Assert.assertNotNull((Object)taskState.getError(this.getClass().getClassLoader()));
            } else {
                Assert.assertNull((Object)taskState.getError(this.getClass().getClassLoader()));
            }
        }
        catch (InterruptedException e) {
            Assert.fail((String)"interrupted");
        }
    }

    private void validateCancelingAndCanceledListenerMessage(Task task) {
        try {
            TaskMessages.UpdateTaskExecutionState message1 = (TaskMessages.UpdateTaskExecutionState)this.listenerMessages.take();
            TaskMessages.UpdateTaskExecutionState message2 = (TaskMessages.UpdateTaskExecutionState)this.listenerMessages.take();
            Assert.assertNotNull((String)"There is no additional listener message", (Object)message1);
            Assert.assertNotNull((String)"There is no additional listener message", (Object)message2);
            TaskExecutionState taskState1 = message1.taskExecutionState();
            TaskExecutionState taskState2 = message2.taskExecutionState();
            Assert.assertEquals((Object)task.getJobID(), (Object)taskState1.getJobID());
            Assert.assertEquals((Object)task.getJobID(), (Object)taskState2.getJobID());
            Assert.assertEquals((Object)task.getExecutionId(), (Object)taskState1.getID());
            Assert.assertEquals((Object)task.getExecutionId(), (Object)taskState2.getID());
            ExecutionState state1 = taskState1.getExecutionState();
            ExecutionState state2 = taskState2.getExecutionState();
            Assert.assertTrue((state1 == ExecutionState.CANCELING && state2 == ExecutionState.CANCELED || state2 == ExecutionState.CANCELING && state1 == ExecutionState.CANCELED ? 1 : 0) != 0);
        }
        catch (InterruptedException e) {
            Assert.fail((String)"interrupted");
        }
    }

    public static final class InvokableWithCancelTaskExceptionInInvoke
    extends AbstractInvokable {
        public void registerInputOutput() {
        }

        public void invoke() throws Exception {
            awaitLatch.trigger();
            try {
                triggerLatch.await();
            }
            catch (Throwable throwable) {
                // empty catch block
            }
            throw new CancelTaskException();
        }
    }

    public static final class InvokableBlockingInInvoke
    extends AbstractInvokable {
        public void registerInputOutput() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void invoke() throws Exception {
            awaitLatch.trigger();
            InvokableBlockingInInvoke invokableBlockingInInvoke = this;
            synchronized (invokableBlockingInInvoke) {
                ((Object)((Object)this)).wait();
            }
        }
    }

    public static final class InvokableBlockingInRegisterInOut
    extends AbstractInvokable {
        public void registerInputOutput() {
            awaitLatch.trigger();
            try {
                triggerLatch.await();
            }
            catch (InterruptedException e) {
                throw new RuntimeException();
            }
        }

        public void invoke() {
        }
    }

    public static abstract class InvokableNonInstantiable
    extends AbstractInvokable {
    }

    public static final class InvokableWithExceptionOnTrigger
    extends AbstractInvokable {
        public void registerInputOutput() {
        }

        public void invoke() {
            awaitLatch.trigger();
            while (true) {
                try {
                    triggerLatch.await();
                }
                catch (InterruptedException interruptedException) {
                    continue;
                }
                break;
            }
            throw new RuntimeException("test");
        }
    }

    public static final class InvokableWithExceptionInInvoke
    extends AbstractInvokable {
        public void registerInputOutput() {
        }

        public void invoke() throws Exception {
            throw new Exception("test");
        }
    }

    public static final class InvokableWithExceptionInRegisterInOut
    extends AbstractInvokable {
        public void registerInputOutput() {
            throw new RuntimeException("test");
        }

        public void invoke() {
        }
    }

    public static final class TestInvokableCorrect
    extends AbstractInvokable {
        public void registerInputOutput() {
        }

        public void invoke() {
        }

        public void cancel() throws Exception {
            Assert.fail((String)"This should not be called");
        }
    }
}

