/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.lang.reflect.Field;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

public class TerminalStateDeadlockTest {
    private final Field stateField;
    private final Field resourceField;
    private final Field execGraphStateField;
    private final Field execGraphSchedulerField;
    private final SimpleSlot resource;

    public TerminalStateDeadlockTest() {
        try {
            this.stateField = Execution.class.getDeclaredField("state");
            this.stateField.setAccessible(true);
            this.resourceField = Execution.class.getDeclaredField("assignedResource");
            this.resourceField.setAccessible(true);
            this.execGraphStateField = ExecutionGraph.class.getDeclaredField("state");
            this.execGraphStateField.setAccessible(true);
            this.execGraphSchedulerField = ExecutionGraph.class.getDeclaredField("scheduler");
            this.execGraphSchedulerField.setAccessible(true);
            InetAddress address = InetAddress.getByName("127.0.0.1");
            InstanceConnectionInfo ci = new InstanceConnectionInfo(address, 12345);
            HardwareDescription resources = new HardwareDescription(4, 4000000L, 3000000L, 2000000L);
            Instance instance = new Instance((ActorGateway)DummyActorGateway.INSTANCE, ci, new InstanceID(), resources, 4);
            this.resource = instance.allocateSimpleSlot(new JobID());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
            throw new RuntimeException();
        }
    }

    @Test
    public void testProvokeDeadlock() {
        try {
            JobID jobId = this.resource.getJobID();
            JobVertexID vid1 = new JobVertexID();
            JobVertexID vid2 = new JobVertexID();
            Configuration jobConfig = new Configuration();
            JobVertex v1 = new JobVertex("v1", vid1);
            JobVertex v2 = new JobVertex("v2", vid2);
            v1.setParallelism(1);
            v2.setParallelism(1);
            v1.setInvokableClass(DummyInvokable.class);
            v2.setInvokableClass(DummyInvokable.class);
            List<JobVertex> vertices = Arrays.asList(v1, v2);
            Scheduler scheduler = new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext());
            ExecutorService executor = Executors.newFixedThreadPool(4);
            for (int i = 0; i < 20000; ++i) {
                TestExecGraph eg = new TestExecGraph(jobId);
                eg.attachJobGraph(vertices);
                eg.setDelayBeforeRetrying(0L);
                eg.setNumberOfRetriesLeft(1);
                final Execution e1 = eg.getJobVertex(vid1).getTaskVertices()[0].getCurrentExecutionAttempt();
                final Execution e2 = eg.getJobVertex(vid2).getTaskVertices()[0].getCurrentExecutionAttempt();
                this.initializeExecution(e1);
                this.initializeExecution(e2);
                this.execGraphStateField.set((Object)eg, JobStatus.FAILING);
                this.execGraphSchedulerField.set((Object)eg, scheduler);
                Runnable r1 = new Runnable(){

                    @Override
                    public void run() {
                        e1.cancelingComplete();
                    }
                };
                Runnable r2 = new Runnable(){

                    @Override
                    public void run() {
                        e2.cancelingComplete();
                    }
                };
                executor.execute(r1);
                executor.execute(r2);
                eg.waitTillDone();
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    private void initializeExecution(Execution exec) throws IllegalAccessException {
        this.stateField.set(exec, ExecutionState.CANCELING);
        this.resourceField.set(exec, this.resource);
    }

    static class TestExecGraph
    extends ExecutionGraph {
        private static final long serialVersionUID = -7606144898417942044L;
        private static final Configuration EMPTY_CONFIG = new Configuration();
        private static final FiniteDuration TIMEOUT = new FiniteDuration(30L, TimeUnit.SECONDS);
        private volatile boolean done;

        TestExecGraph(JobID jobId) {
            super((ExecutionContext)TestingUtils.defaultExecutionContext(), jobId, "test graph", EMPTY_CONFIG, TIMEOUT);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void scheduleForExecution(Scheduler scheduler) {
            TestExecGraph testExecGraph = this;
            synchronized (testExecGraph) {
                this.done = true;
                ((Object)((Object)this)).notifyAll();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void waitTillDone() {
            try {
                TestExecGraph testExecGraph = this;
                synchronized (testExecGraph) {
                    while (!this.done) {
                        ((Object)((Object)this)).wait();
                    }
                }
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

