/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;

public class ExecutionGraphTestUtils {
    public static void waitUntilJobStatus(ExecutionGraph eg, JobStatus status, long maxWaitMillis) throws TimeoutException {
        long deadline;
        Preconditions.checkNotNull((Object)eg);
        Preconditions.checkNotNull((Object)status);
        Preconditions.checkArgument((maxWaitMillis >= 0L ? 1 : 0) != 0);
        long l = deadline = maxWaitMillis == 0L ? Long.MAX_VALUE : System.nanoTime() + maxWaitMillis * 1000000L;
        while (eg.getState() != status && System.nanoTime() < deadline) {
            try {
                Thread.sleep(2L);
            }
            catch (InterruptedException interruptedException) {}
        }
        if (System.nanoTime() >= deadline) {
            throw new TimeoutException(String.format("The job did not reach status %s in time. Current status is %s.", status, eg.getState()));
        }
    }

    public static void waitUntilExecutionState(Execution execution, ExecutionState state, long maxWaitMillis) throws TimeoutException {
        long deadline;
        Preconditions.checkNotNull((Object)execution);
        Preconditions.checkNotNull((Object)state);
        Preconditions.checkArgument((maxWaitMillis >= 0L ? 1 : 0) != 0);
        long l = deadline = maxWaitMillis == 0L ? Long.MAX_VALUE : System.nanoTime() + maxWaitMillis * 1000000L;
        while (execution.getState() != state && System.nanoTime() < deadline) {
            try {
                Thread.sleep(2L);
            }
            catch (InterruptedException interruptedException) {}
        }
        if (System.nanoTime() >= deadline) {
            throw new TimeoutException(String.format("The execution did not reach state %s in time. Current state is %s.", state, execution.getState()));
        }
    }

    public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState state, long maxWaitMillis) throws TimeoutException {
        Execution execution;
        long deadline;
        Preconditions.checkNotNull((Object)executionVertex);
        Preconditions.checkNotNull((Object)state);
        Preconditions.checkArgument((maxWaitMillis >= 0L ? 1 : 0) != 0);
        long l = deadline = maxWaitMillis == 0L ? Long.MAX_VALUE : System.nanoTime() + maxWaitMillis * 1000000L;
        while ((execution = executionVertex.getCurrentExecutionAttempt()) == null || execution.getState() != state && System.nanoTime() < deadline) {
            try {
                Thread.sleep(2L);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (System.nanoTime() < deadline) continue;
            if (execution != null) {
                throw new TimeoutException(String.format("The execution vertex did not reach state %s in time. Current state is %s.", state, execution.getState()));
            }
            throw new TimeoutException("Cannot get current execution attempt of " + executionVertex + '.');
        }
    }

    public static void waitForAllExecutionsPredicate(ExecutionGraph executionGraph, Predicate<AccessExecution> executionPredicate, long maxWaitMillis) throws TimeoutException {
        boolean predicateResult;
        Predicate<AccessExecutionGraph> allExecutionsPredicate = ExecutionGraphTestUtils.allExecutionsPredicate(executionPredicate);
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofMillis(maxWaitMillis));
        do {
            if (predicateResult = allExecutionsPredicate.test((AccessExecutionGraph)executionGraph)) continue;
            try {
                Thread.sleep(2L);
            }
            catch (InterruptedException ignored) {
                Thread.currentThread().interrupt();
            }
        } while (!predicateResult && deadline.hasTimeLeft());
        if (!predicateResult) {
            throw new TimeoutException("Not all executions fulfilled the predicate in time.");
        }
    }

    public static Predicate<AccessExecutionGraph> allExecutionsPredicate(Predicate<AccessExecution> executionPredicate) {
        return accessExecutionGraph -> {
            Iterable allExecutionVertices = accessExecutionGraph.getAllExecutionVertices();
            for (AccessExecutionVertex executionVertex : allExecutionVertices) {
                AccessExecution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
                if (currentExecutionAttempt != null && executionPredicate.test(currentExecutionAttempt)) continue;
                return false;
            }
            return true;
        };
    }

    public static Predicate<AccessExecution> isInExecutionState(ExecutionState executionState) {
        return execution -> execution.getState() == executionState;
    }

    public static void switchAllVerticesToRunning(ExecutionGraph eg) {
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().switchToRecovering();
            vertex.getCurrentExecutionAttempt().switchToRunning();
        }
    }

    public static void completeCancellingForAllVertices(ExecutionGraph eg) {
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().completeCancelling();
        }
    }

    public static void finishAllVertices(ExecutionGraph eg) {
        for (ExecutionVertex vertex : eg.getAllExecutionVertices()) {
            vertex.getCurrentExecutionAttempt().markFinished();
        }
    }

    public static void switchToRunning(ExecutionGraph eg) {
        Execution exec;
        for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
            exec = ev.getCurrentExecutionAttempt();
            ExecutionState executionState = exec.getState();
            assert (executionState == ExecutionState.DEPLOYING) : "Expected executionState to be DEPLOYING, was: " + executionState;
        }
        for (ExecutionVertex ev : eg.getAllExecutionVertices()) {
            exec = ev.getCurrentExecutionAttempt();
            exec.switchToRunning();
        }
    }

    public static void setVertexState(ExecutionVertex vertex, ExecutionState state) {
        try {
            Execution exec = vertex.getCurrentExecutionAttempt();
            Field f = Execution.class.getDeclaredField("state");
            f.setAccessible(true);
            f.set(exec, state);
        }
        catch (Exception e) {
            throw new RuntimeException("Modifying the state failed", e);
        }
    }

    public static void setVertexResource(ExecutionVertex vertex, LogicalSlot slot) {
        Execution exec = vertex.getCurrentExecutionAttempt();
        if (!exec.tryAssignResource(slot)) {
            throw new RuntimeException("Could not assign resource.");
        }
    }

    public static ExecutionGraph createSimpleTestGraph() throws Exception {
        JobVertex vertex = ExecutionGraphTestUtils.createNoOpVertex(10);
        return ExecutionGraphTestUtils.createSimpleTestGraph(vertex);
    }

    public static DefaultExecutionGraph createSimpleTestGraph(JobVertex ... vertices) throws Exception {
        return ExecutionGraphTestUtils.createExecutionGraph(TestingUtils.defaultExecutor(), vertices);
    }

    public static DefaultExecutionGraph createExecutionGraph(ScheduledExecutorService executor, JobVertex ... vertices) throws Exception {
        return ExecutionGraphTestUtils.createExecutionGraph(executor, Time.seconds((long)10L), vertices);
    }

    public static DefaultExecutionGraph createExecutionGraph(ScheduledExecutorService executor, Time timeout, JobVertex ... vertices) throws Exception {
        Preconditions.checkNotNull((Object)vertices);
        Preconditions.checkNotNull((Object)timeout);
        DefaultExecutionGraph executionGraph = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(JobGraphTestUtils.streamingJobGraph(vertices)).setFutureExecutor(executor).setIoExecutor(executor).setRpcTimeout(timeout).build();
        executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        return executionGraph;
    }

    public static JobVertex createNoOpVertex(int parallelism) {
        return ExecutionGraphTestUtils.createNoOpVertex("vertex", parallelism);
    }

    public static JobVertex createNoOpVertex(String name, int parallelism) {
        return ExecutionGraphTestUtils.createNoOpVertex(name, parallelism, -1);
    }

    public static JobVertex createNoOpVertex(String name, int parallelism, int maxParallelism) {
        JobVertex vertex = new JobVertex(name);
        vertex.setInvokableClass(NoOpInvokable.class);
        vertex.setParallelism(parallelism);
        vertex.setMaxParallelism(maxParallelism);
        return vertex;
    }

    public static ExecutionVertexID createRandomExecutionVertexId() {
        return new ExecutionVertexID(new JobVertexID(), new Random().nextInt(Integer.MAX_VALUE));
    }

    public static JobVertex createJobVertex(String task1, int numTasks, Class<NoOpInvokable> invokable) {
        JobVertex groupVertex = new JobVertex(task1);
        groupVertex.setInvokableClass(invokable);
        groupVertex.setParallelism(numTasks);
        return groupVertex;
    }

    public static ExecutionJobVertex getExecutionJobVertex(JobVertexID id, ScheduledExecutorService executor) throws Exception {
        return ExecutionGraphTestUtils.getExecutionJobVertex(id, 1, null, executor);
    }

    public static ExecutionJobVertex getExecutionJobVertex(JobVertexID id, int parallelism, @Nullable SlotSharingGroup slotSharingGroup, ScheduledExecutorService executor) throws Exception {
        JobVertex ajv = new JobVertex("TestVertex", id);
        ajv.setInvokableClass(AbstractInvokable.class);
        ajv.setParallelism(parallelism);
        if (slotSharingGroup != null) {
            ajv.setSlotSharingGroup(slotSharingGroup);
        }
        return ExecutionGraphTestUtils.getExecutionJobVertex(ajv, executor);
    }

    public static ExecutionJobVertex getExecutionJobVertex(JobVertex jobVertex) throws Exception {
        return ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex, (ScheduledExecutorService)new DirectScheduledExecutorService());
    }

    public static ExecutionJobVertex getExecutionJobVertex(JobVertex jobVertex, ScheduledExecutorService executor) throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(jobVertex);
        DefaultScheduler scheduler = SchedulerTestingUtils.newSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread()).setIoExecutor(executor).setFutureExecutor(executor).build();
        return scheduler.getExecutionJobVertex(jobVertex.getID());
    }

    public static ExecutionJobVertex getExecutionJobVertex(JobVertexID id) throws Exception {
        return ExecutionGraphTestUtils.getExecutionJobVertex(id, (ScheduledExecutorService)new DirectScheduledExecutorService());
    }

    public static ExecutionVertex getExecutionVertex() throws Exception {
        return ExecutionGraphTestUtils.getExecutionJobVertex(new JobVertexID(), (ScheduledExecutorService)new DirectScheduledExecutorService()).getTaskVertices()[0];
    }

    public static Execution getExecution() throws Exception {
        ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(new JobVertexID());
        return ejv.getTaskVertices()[0].getCurrentExecutionAttempt();
    }

    public static Execution getExecution(JobVertexID jid, int subtaskIndex, int numTasks, SlotSharingGroup slotSharingGroup) throws Exception {
        ExecutionJobVertex ejv = ExecutionGraphTestUtils.getExecutionJobVertex(jid, numTasks, slotSharingGroup, new DirectScheduledExecutorService());
        return ejv.getTaskVertices()[subtaskIndex].getCurrentExecutionAttempt();
    }

    public static void verifyGeneratedExecutionJobVertex(ExecutionGraph executionGraph, JobVertex originJobVertex, @Nullable List<JobVertex> inputJobVertices, @Nullable List<JobVertex> outputJobVertices) {
        ExecutionJobVertex ejv = (ExecutionJobVertex)executionGraph.getAllVertices().get(originJobVertex.getID());
        Assert.assertNotNull((Object)ejv);
        Assert.assertEquals((long)originJobVertex.getParallelism(), (long)ejv.getParallelism());
        Assert.assertEquals((Object)executionGraph.getJobID(), (Object)ejv.getJobId());
        Assert.assertEquals((Object)originJobVertex.getID(), (Object)ejv.getJobVertexId());
        Assert.assertEquals((Object)originJobVertex, (Object)ejv.getJobVertex());
        if (outputJobVertices == null) {
            Assert.assertEquals((long)0L, (long)ejv.getProducedDataSets().length);
        } else {
            Assert.assertEquals((long)outputJobVertices.size(), (long)ejv.getProducedDataSets().length);
            for (int i = 0; i < outputJobVertices.size(); ++i) {
                Assert.assertEquals((Object)((IntermediateDataSet)originJobVertex.getProducedDataSets().get(i)).getId(), (Object)ejv.getProducedDataSets()[i].getId());
                Assert.assertEquals((long)originJobVertex.getParallelism(), (long)ejv.getProducedDataSets()[0].getPartitions().length);
            }
        }
        Assert.assertEquals((long)originJobVertex.getParallelism(), (long)ejv.getTaskVertices().length);
        int subtaskIndex = 0;
        for (ExecutionVertex ev : ejv.getTaskVertices()) {
            Assert.assertEquals((Object)executionGraph.getJobID(), (Object)ev.getJobId());
            Assert.assertEquals((Object)originJobVertex.getID(), (Object)ev.getJobvertexId());
            Assert.assertEquals((long)originJobVertex.getParallelism(), (long)ev.getTotalNumberOfParallelSubtasks());
            Assert.assertEquals((long)subtaskIndex, (long)ev.getParallelSubtaskIndex());
            if (inputJobVertices == null) {
                Assert.assertEquals((long)0L, (long)ev.getNumberOfInputs());
            } else {
                Assert.assertEquals((long)inputJobVertices.size(), (long)ev.getNumberOfInputs());
                for (int i = 0; i < inputJobVertices.size(); ++i) {
                    ConsumedPartitionGroup consumedPartitionGroup = ev.getConsumedPartitionGroup(i);
                    Assert.assertEquals((long)inputJobVertices.get(i).getParallelism(), (long)consumedPartitionGroup.size());
                    int expectedPartitionNum = 0;
                    for (IntermediateResultPartitionID consumedPartitionId : consumedPartitionGroup) {
                        Assert.assertEquals((long)expectedPartitionNum, (long)consumedPartitionId.getPartitionNumber());
                        ++expectedPartitionNum;
                    }
                }
            }
            ++subtaskIndex;
        }
    }
}

