/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.DefaultExecutionGraph;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IOMetrics;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.executiongraph.TestingDefaultExecutionGraphBuilder;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.scheduler.DefaultScheduler;
import org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder;
import org.apache.flink.runtime.scheduler.ExecutionSlotAllocatorFactory;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlot;
import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider;
import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup;
import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.testutils.TestingUtils;
import org.apache.flink.testutils.executor.TestExecutorExtension;
import org.apache.flink.util.function.FunctionUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class DefaultExecutionGraphDeploymentTest {
    @RegisterExtension
    static final TestExecutorExtension<ScheduledExecutorService> EXECUTOR_EXTENSION = TestingUtils.defaultExecutorExtension();
    protected BlobWriter blobWriter = VoidBlobWriter.getInstance();
    protected PermanentBlobService blobCache = null;

    DefaultExecutionGraphDeploymentTest() {
    }

    protected void checkJobOffloaded(DefaultExecutionGraph eg) throws Exception {
        Assertions.assertThat((boolean)eg.getJobInformationOrBlobKey().isLeft()).isTrue();
    }

    protected void checkTaskOffloaded(ExecutionGraph eg, JobVertexID jobVertexId) throws Exception {
        Assertions.assertThat((boolean)eg.getJobVertex(jobVertexId).getTaskInformationOrBlobKey().isLeft()).isTrue();
    }

    @Test
    void testBuildDeploymentDescriptor() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertexID jid3 = new JobVertexID();
        JobVertexID jid4 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        JobVertex v3 = new JobVertex("v3", jid3);
        JobVertex v4 = new JobVertex("v4", jid4);
        v1.setParallelism(10);
        v2.setParallelism(10);
        v3.setParallelism(10);
        v4.setParallelism(10);
        v1.setInvokableClass(BatchTask.class);
        v2.setInvokableClass(BatchTask.class);
        v3.setInvokableClass(BatchTask.class);
        v4.setInvokableClass(BatchTask.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v3.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = JobGraphTestUtils.batchJobGraph(v1, v2, v3, v4);
        JobID jobId = jobGraph.getJobID();
        DirectScheduledExecutorService executor = new DirectScheduledExecutorService();
        DefaultExecutionGraph eg = TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setBlobWriter(this.blobWriter).build(executor);
        eg.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
        this.checkJobOffloaded(eg);
        ExecutionJobVertex ejv = (ExecutionJobVertex)eg.getAllVertices().get(jid2);
        ExecutionVertex vertex = ejv.getTaskVertices()[3];
        SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway();
        CompletableFuture tdd = new CompletableFuture();
        taskManagerGateway.setSubmitConsumer(FunctionUtils.uncheckedConsumer(taskDeploymentDescriptor -> {
            taskDeploymentDescriptor.loadBigData(this.blobCache);
            tdd.complete(taskDeploymentDescriptor);
        }));
        TestingLogicalSlot slot = new TestingLogicalSlotBuilder().setTaskManagerGateway(taskManagerGateway).createTestingLogicalSlot();
        Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.CREATED);
        vertex.getCurrentExecutionAttempt().transitionState(ExecutionState.SCHEDULED);
        vertex.getCurrentExecutionAttempt().registerProducedPartitions(slot.getTaskManagerLocation()).get();
        vertex.deployToSlot((LogicalSlot)slot);
        Assertions.assertThat((Comparable)vertex.getExecutionState()).isEqualTo((Object)ExecutionState.DEPLOYING);
        this.checkTaskOffloaded((ExecutionGraph)eg, vertex.getJobvertexId());
        TaskDeploymentDescriptor descr = (TaskDeploymentDescriptor)tdd.get();
        Assertions.assertThat((Object)descr).isNotNull();
        JobInformation jobInformation = (JobInformation)descr.getSerializedJobInformation().deserializeValue(this.getClass().getClassLoader());
        TaskInformation taskInformation = (TaskInformation)descr.getSerializedTaskInformation().deserializeValue(this.getClass().getClassLoader());
        Assertions.assertThat((Comparable)descr.getJobId()).isEqualTo((Object)jobId);
        Assertions.assertThat((Comparable)jobInformation.getJobId()).isEqualTo((Object)jobId);
        Assertions.assertThat((Comparable)taskInformation.getJobVertexId()).isEqualTo((Object)jid2);
        Assertions.assertThat((int)descr.getSubtaskIndex()).isEqualTo(3);
        Assertions.assertThat((int)taskInformation.getNumberOfSubtasks()).isEqualTo(10);
        Assertions.assertThat((String)taskInformation.getInvokableClassName()).isEqualTo(BatchTask.class.getName());
        Assertions.assertThat((String)taskInformation.getTaskName()).isEqualTo("v2");
        List producedPartitions = descr.getProducedPartitions();
        List consumedPartitions = descr.getInputGates();
        Assertions.assertThat((int)producedPartitions.size()).isEqualTo(2);
        Assertions.assertThat((int)consumedPartitions.size()).isEqualTo(1);
        Iterator iteratorProducedPartitions = producedPartitions.iterator();
        Iterator iteratorConsumedPartitions = consumedPartitions.iterator();
        Assertions.assertThat((int)((ResultPartitionDeploymentDescriptor)iteratorProducedPartitions.next()).getNumberOfSubpartitions()).isEqualTo(10);
        Assertions.assertThat((int)((ResultPartitionDeploymentDescriptor)iteratorProducedPartitions.next()).getNumberOfSubpartitions()).isEqualTo(10);
        ShuffleDescriptor[] shuffleDescriptors = ((InputGateDeploymentDescriptor)iteratorConsumedPartitions.next()).getShuffleDescriptors();
        Assertions.assertThat((int)shuffleDescriptors.length).isEqualTo(10);
        Iterator iteratorConsumedPartitionGroup = vertex.getAllConsumedPartitionGroups().iterator();
        int idx = 0;
        for (IntermediateResultPartitionID partitionId : (ConsumedPartitionGroup)iteratorConsumedPartitionGroup.next()) {
            Assertions.assertThat((Object)shuffleDescriptors[idx++].getResultPartitionID().getPartitionId()).isEqualTo((Object)partitionId);
        }
    }

    @Test
    void testRegistrationOfExecutionsFinishing() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        SchedulerBase scheduler = this.setupScheduler(v1, 7650, v2, 2350);
        ArrayList executions = new ArrayList(scheduler.getExecutionGraph().getRegisteredExecutions().values());
        for (Execution e : executions) {
            e.markFinished();
        }
        Assertions.assertThat((Map)scheduler.getExecutionGraph().getRegisteredExecutions()).isEmpty();
    }

    @Test
    void testRegistrationOfExecutionsFailing() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        SchedulerBase scheduler = this.setupScheduler(v1, 7, v2, 6);
        ArrayList executions = new ArrayList(scheduler.getExecutionGraph().getRegisteredExecutions().values());
        for (Execution e : executions) {
            e.markFailed(null);
        }
        Assertions.assertThat((Map)scheduler.getExecutionGraph().getRegisteredExecutions()).isEmpty();
    }

    @Test
    void testRegistrationOfExecutionsFailedExternally() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        SchedulerBase scheduler = this.setupScheduler(v1, 7, v2, 6);
        ArrayList executions = new ArrayList(scheduler.getExecutionGraph().getRegisteredExecutions().values());
        for (Execution e : executions) {
            e.fail(null);
        }
        Assertions.assertThat((Map)scheduler.getExecutionGraph().getRegisteredExecutions()).isEmpty();
    }

    @Test
    void testAccumulatorsAndMetricsForwarding() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        SchedulerBase scheduler = this.setupScheduler(v1, 1, v2, 1);
        ExecutionGraph graph = scheduler.getExecutionGraph();
        Map executions = graph.getRegisteredExecutions();
        Execution execution1 = (Execution)executions.values().iterator().next();
        IOMetrics ioMetrics = new IOMetrics(0L, 0L, 0L, 0L, 0L, 0.0, 0L);
        HashMap<String, IntCounter> accumulators = new HashMap<String, IntCounter>();
        accumulators.put("acc", new IntCounter(4));
        AccumulatorSnapshot accumulatorSnapshot = new AccumulatorSnapshot(graph.getJobID(), execution1.getAttemptId(), accumulators);
        TaskExecutionState state = new TaskExecutionState(execution1.getAttemptId(), ExecutionState.CANCELED, null, accumulatorSnapshot, ioMetrics);
        scheduler.updateTaskExecutionState(state);
        this.assertIOMetricsEqual(execution1.getIOMetrics(), ioMetrics);
        Assertions.assertThat((Map)execution1.getUserAccumulators()).isNotNull();
        Assertions.assertThat((Object)((Accumulator)execution1.getUserAccumulators().get("acc")).getLocalValue()).isEqualTo((Object)4);
        Execution execution2 = (Execution)executions.values().iterator().next();
        IOMetrics ioMetrics2 = new IOMetrics(0L, 0L, 0L, 0L, 0L, 0.0, 0L);
        HashMap<String, IntCounter> accumulators2 = new HashMap<String, IntCounter>();
        accumulators2.put("acc", new IntCounter(8));
        AccumulatorSnapshot accumulatorSnapshot2 = new AccumulatorSnapshot(graph.getJobID(), execution2.getAttemptId(), accumulators2);
        TaskExecutionState state2 = new TaskExecutionState(execution2.getAttemptId(), ExecutionState.FAILED, null, accumulatorSnapshot2, ioMetrics2);
        scheduler.updateTaskExecutionState(state2);
        this.assertIOMetricsEqual(execution2.getIOMetrics(), ioMetrics2);
        Assertions.assertThat((Map)execution2.getUserAccumulators()).isNotNull();
        Assertions.assertThat((Object)((Accumulator)execution2.getUserAccumulators().get("acc")).getLocalValue()).isEqualTo((Object)8);
    }

    @Test
    void testAccumulatorsAndMetricsStorage() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        SchedulerBase scheduler = this.setupScheduler(v1, 1, v2, 1);
        Map executions = scheduler.getExecutionGraph().getRegisteredExecutions();
        IOMetrics ioMetrics = new IOMetrics(0L, 0L, 0L, 0L, 0L, 0.0, 0L);
        Map accumulators = Collections.emptyMap();
        Execution execution1 = (Execution)executions.values().iterator().next();
        execution1.cancel();
        execution1.completeCancelling(accumulators, ioMetrics, false);
        this.assertIOMetricsEqual(execution1.getIOMetrics(), ioMetrics);
        Assertions.assertThat((Map)execution1.getUserAccumulators()).isEqualTo(accumulators);
        Execution execution2 = (Execution)executions.values().iterator().next();
        execution2.markFailed(new Throwable(), false, accumulators, ioMetrics, false, true);
        this.assertIOMetricsEqual(execution2.getIOMetrics(), ioMetrics);
        Assertions.assertThat((Map)execution2.getUserAccumulators()).isEqualTo(accumulators);
    }

    @Test
    void testRegistrationOfExecutionsCanceled() throws Exception {
        JobVertexID jid1 = new JobVertexID();
        JobVertexID jid2 = new JobVertexID();
        JobVertex v1 = new JobVertex("v1", jid1);
        JobVertex v2 = new JobVertex("v2", jid2);
        SchedulerBase scheduler = this.setupScheduler(v1, 19, v2, 37);
        ArrayList executions = new ArrayList(scheduler.getExecutionGraph().getRegisteredExecutions().values());
        for (Execution e : executions) {
            e.cancel();
            e.completeCancelling();
        }
        Assertions.assertThat((Map)scheduler.getExecutionGraph().getRegisteredExecutions()).isEmpty();
    }

    @Test
    void testNoResourceAvailableFailure() throws Exception {
        JobVertex v1 = new JobVertex("source");
        JobVertex v2 = new JobVertex("sink");
        int dop1 = 2;
        int dop2 = 2;
        v1.setParallelism(dop1);
        v2.setParallelism(dop2);
        v1.setInvokableClass(BatchTask.class);
        v2.setInvokableClass(BatchTask.class);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        JobGraph graph = JobGraphTestUtils.batchJobGraph(v1, v2);
        DirectScheduledExecutorService directExecutor = new DirectScheduledExecutorService();
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(graph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(TestingPhysicalSlotProvider.createWithLimitedAmountOfPhysicalSlots(1))).setFutureExecutor(directExecutor).setBlobWriter(this.blobWriter).build();
        ExecutionGraph eg = scheduler.getExecutionGraph();
        this.checkJobOffloaded((DefaultExecutionGraph)eg);
        scheduler.startScheduling();
        ExecutionAttemptID attemptID = eg.getJobVertex(v1.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptID, ExecutionState.RUNNING));
        scheduler.updateTaskExecutionState(new TaskExecutionState(attemptID, ExecutionState.FINISHED, null));
        Assertions.assertThat((Comparable)eg.getState()).isEqualTo((Object)JobStatus.FAILED);
    }

    @Test
    void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception {
        Configuration jobManagerConfig = new Configuration();
        ExecutionGraph eg = this.createExecutionGraph(jobManagerConfig);
        Assertions.assertThat((int)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()).isEqualTo(((Integer)CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue());
    }

    private SchedulerBase setupScheduler(JobVertex v1, int dop1, JobVertex v2, int dop2) throws Exception {
        v1.setParallelism(dop1);
        v2.setParallelism(dop2);
        v1.setInvokableClass(BatchTask.class);
        v2.setInvokableClass(BatchTask.class);
        DirectScheduledExecutorService executorService = new DirectScheduledExecutorService();
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(JobGraphTestUtils.streamingJobGraph(v1, v2), ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory()).setFutureExecutor(executorService).setBlobWriter(this.blobWriter).build();
        ExecutionGraph eg = scheduler.getExecutionGraph();
        this.checkJobOffloaded((DefaultExecutionGraph)eg);
        scheduler.startScheduling();
        Map executions = eg.getRegisteredExecutions();
        Assertions.assertThat((int)executions.size()).isEqualTo(dop1 + dop2);
        return scheduler;
    }

    @Test
    void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
        int negativeMaxNumberOfCheckpointsToRetain = -10;
        Configuration jobManagerConfig = new Configuration();
        jobManagerConfig.setInteger(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS, -10);
        ExecutionGraph eg = this.createExecutionGraph(jobManagerConfig);
        Assertions.assertThat((int)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()).isNotEqualTo(-10);
        Assertions.assertThat((int)eg.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints()).isEqualTo(((Integer)CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue());
    }

    /*
     * WARNING - void declaration
     */
    @Test
    void testExecutionGraphIsDeployedInTopologicalOrder() throws Exception {
        void var17_19;
        int sourceParallelism = 2;
        boolean sinkParallelism = true;
        JobVertex sourceVertex = new JobVertex("source");
        sourceVertex.setInvokableClass(NoOpInvokable.class);
        sourceVertex.setParallelism(2);
        JobVertex sinkVertex = new JobVertex("sink");
        sinkVertex.setInvokableClass(NoOpInvokable.class);
        sinkVertex.setParallelism(1);
        sinkVertex.connectNewDataSetAsInput(sourceVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        int numberTasks = 3;
        ArrayBlockingQueue submittedTasksQueue = new ArrayBlockingQueue(3);
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        testingTaskExecutorGatewayBuilder.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
            submittedTasksQueue.offer(taskDeploymentDescriptor.getExecutionAttemptId());
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation();
        TestingTaskExecutorGateway taskExecutorGateway = testingTaskExecutorGatewayBuilder.createTestingTaskExecutorGateway();
        RpcTaskManagerGateway taskManagerGateway = new RpcTaskManagerGateway((TaskExecutorGateway)taskExecutorGateway, JobMasterId.generate());
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(sourceVertex, sinkVertex);
        TestingPhysicalSlotProvider physicalSlotProvider = TestingPhysicalSlotProvider.createWithoutImmediatePhysicalSlotCreation();
        DefaultScheduler scheduler = new DefaultSchedulerBuilder(jobGraph, ComponentMainThreadExecutorServiceAdapter.forMainThread(), (ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor()).setExecutionSlotAllocatorFactory((ExecutionSlotAllocatorFactory)SchedulerTestingUtils.newSlotSharingExecutionSlotAllocatorFactory(physicalSlotProvider)).setFutureExecutor(new DirectScheduledExecutorService()).build();
        ExecutionGraph executionGraph = scheduler.getExecutionGraph();
        scheduler.startScheduling();
        ArrayList<CompletableFuture<TestingPhysicalSlot>> shuffledFutures = new ArrayList<CompletableFuture<TestingPhysicalSlot>>(physicalSlotProvider.getResponses().values());
        Collections.shuffle(shuffledFutures);
        for (CompletableFuture completableFuture : shuffledFutures) {
            completableFuture.complete(TestingPhysicalSlot.builder().withTaskManagerLocation(taskManagerLocation).withTaskManagerGateway((TaskManagerGateway)taskManagerGateway).build());
        }
        ArrayList<ExecutionAttemptID> submittedTasks = new ArrayList<ExecutionAttemptID>(3);
        boolean bl = false;
        while (var17_19 < 3) {
            submittedTasks.add((ExecutionAttemptID)submittedTasksQueue.take());
            ++var17_19;
        }
        ArrayList<ExecutionAttemptID> arrayList = new ArrayList<ExecutionAttemptID>(2);
        for (ExecutionVertex taskVertex : executionGraph.getJobVertex(sourceVertex.getID()).getTaskVertices()) {
            arrayList.add(taskVertex.getCurrentExecutionAttempt().getAttemptId());
        }
        ArrayList<ExecutionAttemptID> secondStage = new ArrayList<ExecutionAttemptID>(1);
        for (ExecutionVertex taskVertex : executionGraph.getJobVertex(sinkVertex.getID()).getTaskVertices()) {
            secondStage.add(taskVertex.getCurrentExecutionAttempt().getAttemptId());
        }
        Assertions.assertThat((boolean)DefaultExecutionGraphDeploymentTest.isDeployedInTopologicalOrder(submittedTasks, Arrays.asList(arrayList, secondStage))).isTrue();
    }

    private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(new CheckpointCoordinatorConfiguration(100L, 600000L, 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, false, 0, 0L), null));
        return TestingDefaultExecutionGraphBuilder.newBuilder().setJobGraph(jobGraph).setJobMasterConfig(configuration).setBlobWriter(this.blobWriter).build((ScheduledExecutorService)EXECUTOR_EXTENSION.getExecutor());
    }

    private static boolean isDeployedInTopologicalOrder(List<ExecutionAttemptID> submissionOrder, List<Collection<ExecutionAttemptID>> executionStages) {
        Iterator<ExecutionAttemptID> submissionIterator = submissionOrder.iterator();
        for (Collection<ExecutionAttemptID> stage : executionStages) {
            ArrayList<ExecutionAttemptID> currentStage = new ArrayList<ExecutionAttemptID>(stage);
            while (!currentStage.isEmpty() && submissionIterator.hasNext()) {
                if (currentStage.remove(submissionIterator.next())) continue;
                return false;
            }
            if (currentStage.isEmpty()) continue;
            return false;
        }
        return !submissionIterator.hasNext();
    }

    private void assertIOMetricsEqual(IOMetrics ioMetrics1, IOMetrics ioMetrics2) {
        Assertions.assertThat((long)ioMetrics1.numBytesIn).isEqualTo(ioMetrics2.numBytesIn);
        Assertions.assertThat((long)ioMetrics1.numBytesOut).isEqualTo(ioMetrics2.numBytesOut);
        Assertions.assertThat((long)ioMetrics1.numRecordsIn).isEqualTo(ioMetrics2.numRecordsIn);
        Assertions.assertThat((long)ioMetrics1.numRecordsOut).isEqualTo(ioMetrics2.numRecordsOut);
        Assertions.assertThat((long)ioMetrics1.accumulateIdleTime).isEqualTo(ioMetrics2.accumulateIdleTime);
        Assertions.assertThat((double)ioMetrics1.accumulateBusyTime).isEqualTo(ioMetrics2.accumulateBusyTime);
        Assertions.assertThat((long)ioMetrics1.accumulateBackPressuredTime).isEqualTo(ioMetrics2.accumulateBackPressuredTime);
        Assertions.assertThat((Map)ioMetrics1.resultPartitionBytes).isEqualTo((Object)ioMetrics2.resultPartitionBytes);
    }
}

