/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptive;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.JobInfoImpl;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.failure.FailureEnricher;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.CheckpointScheduling;
import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
import org.apache.flink.runtime.checkpoint.CheckpointsCleaner;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
import org.apache.flink.runtime.checkpoint.SubTaskInitializationMetrics;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.DefaultVertexAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.JobStatusListener;
import org.apache.flink.runtime.executiongraph.MutableVertexAttemptNumberStore;
import org.apache.flink.runtime.executiongraph.TaskExecutionStateTransition;
import org.apache.flink.runtime.executiongraph.failover.ExecutionFailureHandler;
import org.apache.flink.runtime.executiongraph.failover.RestartBackoffTimeStrategy;
import org.apache.flink.runtime.failure.DefaultFailureEnricherContext;
import org.apache.flink.runtime.failure.FailureEnricherUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobResourceRequirements;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SerializedInputSplit;
import org.apache.flink.runtime.jobmaster.SlotInfo;
import org.apache.flink.runtime.jobmaster.slotpool.DeclarativeSlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismInfo;
import org.apache.flink.runtime.scheduler.DefaultVertexParallelismStore;
import org.apache.flink.runtime.scheduler.ExecutionGraphFactory;
import org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.scheduler.JobStatusStore;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.SchedulerBase;
import org.apache.flink.runtime.scheduler.SchedulerNG;
import org.apache.flink.runtime.scheduler.SchedulerUtils;
import org.apache.flink.runtime.scheduler.UpdateSchedulerNgOnInternalFailuresListener;
import org.apache.flink.runtime.scheduler.VertexParallelismInformation;
import org.apache.flink.runtime.scheduler.VertexParallelismStore;
import org.apache.flink.runtime.scheduler.adaptive.BackgroundTask;
import org.apache.flink.runtime.scheduler.adaptive.Canceling;
import org.apache.flink.runtime.scheduler.adaptive.Created;
import org.apache.flink.runtime.scheduler.adaptive.CreatingExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.Executing;
import org.apache.flink.runtime.scheduler.adaptive.Failing;
import org.apache.flink.runtime.scheduler.adaptive.FailureResult;
import org.apache.flink.runtime.scheduler.adaptive.Finished;
import org.apache.flink.runtime.scheduler.adaptive.JobGraphJobInformation;
import org.apache.flink.runtime.scheduler.adaptive.JobSchedulingPlan;
import org.apache.flink.runtime.scheduler.adaptive.ResourceListener;
import org.apache.flink.runtime.scheduler.adaptive.Restarting;
import org.apache.flink.runtime.scheduler.adaptive.State;
import org.apache.flink.runtime.scheduler.adaptive.StateFactory;
import org.apache.flink.runtime.scheduler.adaptive.StateWithExecutionGraph;
import org.apache.flink.runtime.scheduler.adaptive.StopWithSavepoint;
import org.apache.flink.runtime.scheduler.adaptive.WaitingForResources;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobAllocationsInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.JobInformation;
import org.apache.flink.runtime.scheduler.adaptive.allocator.ReservedSlots;
import org.apache.flink.runtime.scheduler.adaptive.allocator.SlotAllocator;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceMinimalIncreaseRescalingController;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.EnforceParallelismChangeRescalingController;
import org.apache.flink.runtime.scheduler.adaptive.scalingpolicy.RescalingController;
import org.apache.flink.runtime.scheduler.exceptionhistory.ExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.exceptionhistory.RootExceptionHistoryEntry;
import org.apache.flink.runtime.scheduler.metrics.DeploymentStateTimeMetrics;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.util.BoundedFIFOQueue;
import org.apache.flink.runtime.util.ResourceCounter;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedThrowable;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AdaptiveScheduler
implements SchedulerNG,
Created.Context,
WaitingForResources.Context,
CreatingExecutionGraph.Context,
Executing.Context,
Restarting.Context,
Failing.Context,
Finished.Context,
StopWithSavepoint.Context {
    private static final Logger LOG = LoggerFactory.getLogger(AdaptiveScheduler.class);
    private final Settings settings;
    private final JobGraph jobGraph;
    private final JobInfo jobInfo;
    private final VertexParallelismStore initialParallelismStore;
    private final DeclarativeSlotPool declarativeSlotPool;
    private final long initializationTimestamp;
    private final Executor ioExecutor;
    private final ClassLoader userCodeClassLoader;
    private final CheckpointsCleaner checkpointsCleaner;
    private final CompletedCheckpointStore completedCheckpointStore;
    private final CheckpointIDCounter checkpointIdCounter;
    private final CompletableFuture<JobStatus> jobTerminationFuture = new CompletableFuture();
    private final RestartBackoffTimeStrategy restartBackoffTimeStrategy;
    private final ComponentMainThreadExecutor componentMainThreadExecutor;
    private final FatalErrorHandler fatalErrorHandler;
    private final Collection<FailureEnricher> failureEnrichers;
    private final Collection<JobStatusListener> jobStatusListeners;
    private final SlotAllocator slotAllocator;
    private final RescalingController rescalingController;
    private final RescalingController forceRescalingController;
    private final ExecutionGraphFactory executionGraphFactory;
    private State state = new Created(this, LOG);
    private boolean isTransitioningState = false;
    private int numRestarts = 0;
    private final MutableVertexAttemptNumberStore vertexAttemptNumberStore = new DefaultVertexAttemptNumberStore();
    private BackgroundTask<ExecutionGraph> backgroundTask = BackgroundTask.finishedBackgroundTask();
    private final DeploymentStateTimeMetrics deploymentTimeMetrics;
    private final BoundedFIFOQueue<RootExceptionHistoryEntry> exceptionHistory;
    private JobGraphJobInformation jobInformation;
    private ResourceCounter desiredResources = ResourceCounter.empty();
    private final JobManagerJobMetricGroup jobManagerJobMetricGroup;

    public AdaptiveScheduler(Settings settings, JobGraph jobGraph, @Nullable JobResourceRequirements jobResourceRequirements, Configuration configuration, DeclarativeSlotPool declarativeSlotPool, SlotAllocator slotAllocator, Executor ioExecutor, ClassLoader userCodeClassLoader, CheckpointsCleaner checkpointsCleaner, CheckpointRecoveryFactory checkpointRecoveryFactory, JobManagerJobMetricGroup jobManagerJobMetricGroup, RestartBackoffTimeStrategy restartBackoffTimeStrategy, long initializationTimestamp, ComponentMainThreadExecutor mainThreadExecutor, FatalErrorHandler fatalErrorHandler, JobStatusListener jobStatusListener, Collection<FailureEnricher> failureEnrichers, ExecutionGraphFactory executionGraphFactory) throws JobExecutionException {
        AdaptiveScheduler.assertPreconditions(jobGraph);
        this.settings = settings;
        this.jobGraph = jobGraph;
        this.jobInfo = new JobInfoImpl(jobGraph.getJobID(), jobGraph.getName());
        VertexParallelismStore vertexParallelismStore = AdaptiveScheduler.computeVertexParallelismStore(jobGraph, settings.getExecutionMode());
        if (jobResourceRequirements != null) {
            vertexParallelismStore = DefaultVertexParallelismStore.applyJobResourceRequirements(vertexParallelismStore, jobResourceRequirements).orElse(vertexParallelismStore);
        }
        this.initialParallelismStore = vertexParallelismStore;
        this.jobInformation = new JobGraphJobInformation(jobGraph, vertexParallelismStore);
        this.declarativeSlotPool = declarativeSlotPool;
        this.initializationTimestamp = initializationTimestamp;
        this.ioExecutor = ioExecutor;
        this.userCodeClassLoader = userCodeClassLoader;
        this.restartBackoffTimeStrategy = restartBackoffTimeStrategy;
        this.fatalErrorHandler = fatalErrorHandler;
        this.checkpointsCleaner = checkpointsCleaner;
        this.completedCheckpointStore = SchedulerUtils.createCompletedCheckpointStoreIfCheckpointingIsEnabled(jobGraph, configuration, checkpointRecoveryFactory, ioExecutor, LOG);
        this.checkpointIdCounter = SchedulerUtils.createCheckpointIDCounterIfCheckpointingIsEnabled(jobGraph, checkpointRecoveryFactory);
        this.slotAllocator = slotAllocator;
        declarativeSlotPool.registerNewSlotsListener(this::newResourcesAvailable);
        this.componentMainThreadExecutor = mainThreadExecutor;
        this.rescalingController = new EnforceMinimalIncreaseRescalingController(configuration);
        this.forceRescalingController = new EnforceParallelismChangeRescalingController();
        this.executionGraphFactory = executionGraphFactory;
        JobStatusStore jobStatusStore = new JobStatusStore(initializationTimestamp);
        ArrayList<JobStatusListener> tmpJobStatusListeners = new ArrayList<JobStatusListener>();
        tmpJobStatusListeners.add(Preconditions.checkNotNull(jobStatusListener));
        tmpJobStatusListeners.add(jobStatusStore);
        MetricOptions.JobStatusMetricsSettings jobStatusMetricsSettings = MetricOptions.JobStatusMetricsSettings.fromConfiguration(configuration);
        this.deploymentTimeMetrics = new DeploymentStateTimeMetrics(jobGraph.getJobType(), jobStatusMetricsSettings);
        SchedulerBase.registerJobMetrics(jobManagerJobMetricGroup, jobStatusStore, (Gauge<Long>)((Gauge)() -> this.numRestarts), this.deploymentTimeMetrics, tmpJobStatusListeners::add, initializationTimestamp, jobStatusMetricsSettings);
        this.jobStatusListeners = Collections.unmodifiableCollection(tmpJobStatusListeners);
        this.failureEnrichers = failureEnrichers;
        this.exceptionHistory = new BoundedFIFOQueue(configuration.get(WebOptions.MAX_EXCEPTION_HISTORY_SIZE));
        this.jobManagerJobMetricGroup = jobManagerJobMetricGroup;
    }

    private static void assertPreconditions(JobGraph jobGraph) throws RuntimeException {
        Preconditions.checkState(jobGraph.getJobType() == JobType.STREAMING, "The adaptive scheduler only supports streaming jobs.");
        for (JobVertex vertex : jobGraph.getVertices()) {
            Preconditions.checkState(vertex.getParallelism() > 0, "The adaptive scheduler expects the parallelism being set for each JobVertex (violated JobVertex: %s).", vertex.getID());
            for (JobEdge jobEdge : vertex.getInputs()) {
                Preconditions.checkState(jobEdge.getSource().getResultType().isPipelinedOrPipelinedBoundedResultPartition(), "The adaptive scheduler supports pipelined data exchanges (violated by %s -> %s).", jobEdge.getSource().getProducer(), jobEdge.getTarget().getID());
            }
        }
    }

    @VisibleForTesting
    static VertexParallelismStore computeReactiveModeVertexParallelismStore(Iterable<JobVertex> vertices, Function<JobVertex, Integer> defaultMaxParallelismFunc, boolean adjustParallelism) {
        DefaultVertexParallelismStore store = new DefaultVertexParallelismStore();
        for (JobVertex vertex : vertices) {
            int maxParallelism = vertex.getMaxParallelism() == -1 ? defaultMaxParallelismFunc.apply(vertex).intValue() : vertex.getMaxParallelism();
            int parallelism = adjustParallelism ? maxParallelism : vertex.getParallelism();
            DefaultVertexParallelismInfo parallelismInfo = new DefaultVertexParallelismInfo(parallelism, maxParallelism, newMax -> newMax >= maxParallelism ? Optional.empty() : Optional.of("Cannot lower max parallelism in Reactive mode."));
            store.setParallelismInfo(vertex.getID(), parallelismInfo);
        }
        return store;
    }

    private static VertexParallelismStore computeVertexParallelismStore(JobGraph jobGraph, SchedulerExecutionMode executionMode) {
        if (executionMode == SchedulerExecutionMode.REACTIVE) {
            return AdaptiveScheduler.computeReactiveModeVertexParallelismStore(jobGraph.getVertices(), SchedulerBase::getDefaultMaxParallelism, true);
        }
        return SchedulerBase.computeVertexParallelismStore(jobGraph);
    }

    @VisibleForTesting
    static VertexParallelismStore computeVertexParallelismStoreForExecution(JobGraph jobGraph, SchedulerExecutionMode executionMode, Function<JobVertex, Integer> defaultMaxParallelismFunc) {
        if (executionMode == SchedulerExecutionMode.REACTIVE) {
            return AdaptiveScheduler.computeReactiveModeVertexParallelismStore(jobGraph.getVertices(), defaultMaxParallelismFunc, false);
        }
        return SchedulerBase.computeVertexParallelismStore(jobGraph.getVertices(), defaultMaxParallelismFunc);
    }

    private void newResourcesAvailable(Collection<? extends PhysicalSlot> physicalSlots) {
        this.state.tryRun(ResourceListener.class, ResourceListener::onNewResourcesAvailable, "newResourcesAvailable");
    }

    @Override
    public void startScheduling() {
        this.checkIdleSlotTimeout();
        this.state.as(Created.class).orElseThrow(() -> new IllegalStateException("Can only start scheduling when being in Created state.")).startScheduling();
    }

    @Override
    public CompletableFuture<Void> closeAsync() {
        LOG.debug("Closing the AdaptiveScheduler. Trying to suspend the current job execution.");
        this.state.suspend(new FlinkException("AdaptiveScheduler is being stopped."));
        Preconditions.checkState(this.state instanceof Finished, "Scheduler state should be finished after calling state.suspend.");
        this.backgroundTask.abort();
        return FutureUtils.composeAfterwards(FutureUtils.runAfterwardsAsync(this.backgroundTask.getTerminationFuture(), () -> this.stopCheckpointServicesSafely(this.jobTerminationFuture.get()), (Executor)this.getMainThreadExecutor()), this.checkpointsCleaner::closeAsync);
    }

    private void stopCheckpointServicesSafely(JobStatus terminalState) {
        LOG.debug("Stopping the checkpoint services with state {}.", (Object)terminalState);
        Exception exception = null;
        try {
            this.completedCheckpointStore.shutdown(terminalState, this.checkpointsCleaner);
        }
        catch (Exception e) {
            exception = e;
        }
        try {
            this.checkpointIdCounter.shutdown(terminalState).get();
        }
        catch (Exception e) {
            exception = ExceptionUtils.firstOrSuppressed(e, exception);
        }
        if (exception != null) {
            LOG.warn("Failed to stop checkpoint services.", (Throwable)exception);
        }
    }

    @Override
    public void cancel() {
        this.state.cancel();
    }

    @Override
    public CompletableFuture<JobStatus> getJobTerminationFuture() {
        return this.jobTerminationFuture;
    }

    @Override
    public void handleGlobalFailure(Throwable cause) {
        FailureEnricher.Context ctx = DefaultFailureEnricherContext.forGlobalFailure(this.jobInfo, this.jobManagerJobMetricGroup, this.ioExecutor, this.userCodeClassLoader);
        CompletableFuture<Map<String, String>> failureLabels = FailureEnricherUtils.labelFailure(cause, ctx, (Executor)this.getMainThreadExecutor(), this.failureEnrichers);
        this.state.handleGlobalFailure(cause, failureLabels);
    }

    private CompletableFuture<Map<String, String>> labelFailure(TaskExecutionStateTransition taskExecutionStateTransition) {
        if (taskExecutionStateTransition.getExecutionState() == ExecutionState.FAILED && !this.failureEnrichers.isEmpty()) {
            Throwable cause = taskExecutionStateTransition.getError(this.userCodeClassLoader);
            FailureEnricher.Context ctx = DefaultFailureEnricherContext.forTaskFailure(this.jobInfo, this.jobManagerJobMetricGroup, this.ioExecutor, this.userCodeClassLoader);
            return FailureEnricherUtils.labelFailure(cause, ctx, (Executor)this.getMainThreadExecutor(), this.failureEnrichers);
        }
        return FailureEnricherUtils.EMPTY_FAILURE_LABELS;
    }

    @Override
    public boolean updateTaskExecutionState(TaskExecutionStateTransition taskExecutionState) {
        return this.state.tryCall(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.updateTaskExecutionState(taskExecutionState, this.labelFailure(taskExecutionState)), "updateTaskExecutionState").orElse(false);
    }

    @Override
    public SerializedInputSplit requestNextInputSplit(JobVertexID vertexID, ExecutionAttemptID executionAttempt) throws IOException {
        return this.state.tryCall(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.requestNextInputSplit(vertexID, executionAttempt), "requestNextInputSplit").orElseThrow(() -> new IOException("Scheduler is currently not executing the job."));
    }

    @Override
    public ExecutionState requestPartitionState(IntermediateDataSetID intermediateResultId, ResultPartitionID resultPartitionId) throws PartitionProducerDisposedException {
        return this.state.tryCall(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.requestPartitionState(intermediateResultId, resultPartitionId), "requestPartitionState").orElseThrow(() -> new PartitionProducerDisposedException(resultPartitionId));
    }

    @Override
    public ExecutionGraphInfo requestJob() {
        return new ExecutionGraphInfo(this.state.getJob(), this.exceptionHistory.toArrayList());
    }

    @Override
    public CheckpointStatsSnapshot requestCheckpointStats() {
        return this.state.getJob().getCheckpointStatsSnapshot();
    }

    @Override
    public void archiveFailure(RootExceptionHistoryEntry failure) {
        this.exceptionHistory.add(failure);
    }

    @Override
    public JobStatus requestJobStatus() {
        return this.state.getJobStatus();
    }

    @Override
    public KvStateLocation requestKvStateLocation(JobID jobId, String registrationName) throws UnknownKvStateLocation, FlinkJobNotFoundException {
        Optional<StateWithExecutionGraph> asOptional = this.state.as(StateWithExecutionGraph.class);
        if (asOptional.isPresent()) {
            return asOptional.get().requestKvStateLocation(jobId, registrationName);
        }
        throw new UnknownKvStateLocation(registrationName);
    }

    @Override
    public void notifyKvStateRegistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName, KvStateID kvStateId, InetSocketAddress kvStateServerAddress) throws FlinkJobNotFoundException {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.notifyKvStateRegistered(jobId, jobVertexId, keyGroupRange, registrationName, kvStateId, kvStateServerAddress), "notifyKvStateRegistered");
    }

    @Override
    public void notifyKvStateUnregistered(JobID jobId, JobVertexID jobVertexId, KeyGroupRange keyGroupRange, String registrationName) throws FlinkJobNotFoundException {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.notifyKvStateUnregistered(jobId, jobVertexId, keyGroupRange, registrationName), "notifyKvStateUnregistered");
    }

    @Override
    public void updateAccumulators(AccumulatorSnapshot accumulatorSnapshot) {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.updateAccumulators(accumulatorSnapshot), "updateAccumulators");
    }

    @Override
    public CompletableFuture<String> triggerSavepoint(@Nullable String targetDirectory, boolean cancelJob, SavepointFormatType formatType) {
        return this.state.tryCall(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.triggerSavepoint(targetDirectory, cancelJob, formatType), "triggerSavepoint").orElse(FutureUtils.completedExceptionally(new CheckpointException("The Flink job is currently not executing.", CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
    }

    @Override
    public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(CheckpointType checkpointType) {
        return this.state.tryCall(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.triggerCheckpoint(checkpointType), "triggerCheckpoint").orElse(FutureUtils.completedExceptionally(new CheckpointException("The Flink job is currently not executing.", CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
    }

    @Override
    public void acknowledgeCheckpoint(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics, TaskStateSnapshot checkpointState) {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.acknowledgeCheckpoint(jobID, executionAttemptID, checkpointId, checkpointMetrics, checkpointState), "acknowledgeCheckpoint");
    }

    @Override
    public void notifyEndOfData(ExecutionAttemptID executionAttemptID) {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.notifyEndOfData(executionAttemptID), "notifyEndOfData");
    }

    @Override
    public void reportCheckpointMetrics(JobID jobID, ExecutionAttemptID executionAttemptID, long checkpointId, CheckpointMetrics checkpointMetrics) {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.reportCheckpointMetrics(executionAttemptID, checkpointId, checkpointMetrics), "reportCheckpointMetrics");
    }

    @Override
    public void declineCheckpoint(DeclineCheckpoint decline) {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.declineCheckpoint(decline), "declineCheckpoint");
    }

    @Override
    public void reportInitializationMetrics(JobID jobId, SubTaskInitializationMetrics initializationMetrics) {
        this.state.tryRun(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.reportInitializationMetrics(initializationMetrics), "reportCheckpointMetrics");
    }

    @Override
    public CompletableFuture<String> stopWithSavepoint(@Nullable String targetDirectory, boolean terminate, SavepointFormatType formatType) {
        return this.state.tryCall(Executing.class, executing -> executing.stopWithSavepoint(targetDirectory, terminate, formatType), "stopWithSavepoint").orElse(FutureUtils.completedExceptionally(new CheckpointException("The Flink job is currently not executing.", CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE)));
    }

    @Override
    public void deliverOperatorEventToCoordinator(ExecutionAttemptID taskExecution, OperatorID operator, OperatorEvent evt) throws FlinkException {
        StateWithExecutionGraph stateWithExecutionGraph = this.state.as(StateWithExecutionGraph.class).orElseThrow(() -> new TaskNotRunningException("Task is not known or in state running on the JobManager."));
        stateWithExecutionGraph.deliverOperatorEventToCoordinator(taskExecution, operator, evt);
    }

    @Override
    public CompletableFuture<CoordinationResponse> deliverCoordinationRequestToCoordinator(OperatorID operator, CoordinationRequest request) throws FlinkException {
        return this.state.tryCall(StateWithExecutionGraph.class, stateWithExecutionGraph -> stateWithExecutionGraph.deliverCoordinationRequestToCoordinator(operator, request), "deliverCoordinationRequestToCoordinator").orElseGet(() -> FutureUtils.completedExceptionally(new FlinkException("Coordinator of operator " + operator + " does not exist")));
    }

    @Override
    public JobResourceRequirements requestJobResourceRequirements() {
        JobResourceRequirements.Builder builder = JobResourceRequirements.newBuilder();
        for (JobInformation.VertexInformation vertex : this.jobInformation.getVertices()) {
            builder.setParallelismForJobVertex(vertex.getJobVertexID(), vertex.getMinParallelism(), vertex.getParallelism());
        }
        return builder.build();
    }

    @Override
    public void updateJobResourceRequirements(JobResourceRequirements jobResourceRequirements) {
        if (this.settings.getExecutionMode() == SchedulerExecutionMode.REACTIVE) {
            throw new UnsupportedOperationException("Cannot change the parallelism of a job running in reactive mode.");
        }
        Optional<VertexParallelismStore> maybeUpdateVertexParallelismStore = DefaultVertexParallelismStore.applyJobResourceRequirements(this.jobInformation.getVertexParallelismStore(), jobResourceRequirements);
        if (maybeUpdateVertexParallelismStore.isPresent()) {
            this.jobInformation = new JobGraphJobInformation(this.jobGraph, maybeUpdateVertexParallelismStore.get());
            this.declareDesiredResources();
            this.state.tryRun(ResourceListener.class, ResourceListener::onNewResourceRequirements, "Current state does not react to desired parallelism changes.");
        }
    }

    @Override
    public boolean hasDesiredResources() {
        Collection<SlotInfo> freeSlots = this.declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation();
        return AdaptiveScheduler.hasDesiredResources(this.desiredResources, freeSlots);
    }

    @VisibleForTesting
    static boolean hasDesiredResources(ResourceCounter desiredResources, Collection<? extends SlotInfo> freeSlots) {
        ResourceCounter outstandingResources = desiredResources;
        Iterator<? extends SlotInfo> slotIterator = freeSlots.iterator();
        while (!outstandingResources.isEmpty() && slotIterator.hasNext()) {
            SlotInfo slotInfo = slotIterator.next();
            ResourceProfile resourceProfile = slotInfo.getResourceProfile();
            if (outstandingResources.containsResource(resourceProfile)) {
                outstandingResources = outstandingResources.subtract(resourceProfile, 1);
                continue;
            }
            outstandingResources = outstandingResources.subtract(ResourceProfile.UNKNOWN, 1);
        }
        return outstandingResources.isEmpty();
    }

    @Override
    public boolean hasSufficientResources() {
        return this.slotAllocator.determineParallelism(this.jobInformation, this.declarativeSlotPool.getAllSlotsInformation()).isPresent();
    }

    private JobSchedulingPlan determineParallelism(SlotAllocator slotAllocator, @Nullable ExecutionGraph previousExecutionGraph) throws NoResourceAvailableException {
        return slotAllocator.determineParallelismAndCalculateAssignment(this.jobInformation, this.declarativeSlotPool.getFreeSlotInfoTracker().getFreeSlotsInformation(), JobAllocationsInformation.fromGraph(previousExecutionGraph)).orElseThrow(() -> new NoResourceAvailableException("Not enough resources available for scheduling."));
    }

    @Override
    public ArchivedExecutionGraph getArchivedExecutionGraph(JobStatus jobStatus, @Nullable Throwable cause) {
        return ArchivedExecutionGraph.createSparseArchivedExecutionGraphWithJobVertices(this.jobInformation.getJobID(), this.jobInformation.getName(), jobStatus, cause, this.jobInformation.getCheckpointingSettings(), this.initializationTimestamp, this.jobGraph.getVertices(), this.initialParallelismStore);
    }

    @Override
    public void goToWaitingForResources(@Nullable ExecutionGraph previousExecutionGraph) {
        this.declareDesiredResources();
        this.transitionToState(new WaitingForResources.Factory(this, LOG, this.settings.getInitialResourceAllocationTimeout(), this.settings.getResourceStabilizationTimeout(), previousExecutionGraph));
    }

    private void declareDesiredResources() {
        ResourceCounter newDesiredResources = this.calculateDesiredResources();
        if (!newDesiredResources.equals(this.desiredResources)) {
            this.desiredResources = newDesiredResources;
            this.declarativeSlotPool.setResourceRequirements(this.desiredResources);
        }
    }

    private ResourceCounter calculateDesiredResources() {
        return this.slotAllocator.calculateRequiredSlots(this.jobInformation.getVertices());
    }

    @Override
    public void goToExecuting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> failureCollection) {
        this.transitionToState(new Executing.Factory(executionGraph, executionGraphHandler, operatorCoordinatorHandler, LOG, this, this.userCodeClassLoader, failureCollection, this.settings.getScalingIntervalMin(), this.settings.getScalingIntervalMax()));
    }

    @Override
    public void goToCanceling(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, List<ExceptionHistoryEntry> failureCollection) {
        this.transitionToState(new Canceling.Factory(this, executionGraph, executionGraphHandler, operatorCoordinatorHandler, LOG, this.userCodeClassLoader, failureCollection));
    }

    @Override
    public void goToRestarting(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Duration backoffTime, List<ExceptionHistoryEntry> failureCollection) {
        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
            int attemptNumber = executionVertex.getCurrentExecutionAttempt().getAttemptNumber();
            this.vertexAttemptNumberStore.setAttemptCount(executionVertex.getJobvertexId(), executionVertex.getParallelSubtaskIndex(), attemptNumber + 1);
        }
        this.transitionToState(new Restarting.Factory(this, executionGraph, executionGraphHandler, operatorCoordinatorHandler, LOG, backoffTime, this.userCodeClassLoader, failureCollection));
        ++this.numRestarts;
    }

    @Override
    public void goToFailing(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, Throwable failureCause, List<ExceptionHistoryEntry> failureCollection) {
        this.transitionToState(new Failing.Factory(this, executionGraph, executionGraphHandler, operatorCoordinatorHandler, LOG, failureCause, this.userCodeClassLoader, failureCollection));
    }

    @Override
    public CompletableFuture<String> goToStopWithSavepoint(ExecutionGraph executionGraph, ExecutionGraphHandler executionGraphHandler, OperatorCoordinatorHandler operatorCoordinatorHandler, CheckpointScheduling checkpointScheduling, CompletableFuture<String> savepointFuture, List<ExceptionHistoryEntry> failureCollection) {
        StopWithSavepoint stopWithSavepoint = this.transitionToState(new StopWithSavepoint.Factory(this, executionGraph, executionGraphHandler, operatorCoordinatorHandler, checkpointScheduling, LOG, this.userCodeClassLoader, savepointFuture, failureCollection));
        return stopWithSavepoint.getOperationFuture();
    }

    @Override
    public void goToFinished(ArchivedExecutionGraph archivedExecutionGraph) {
        this.transitionToState(new Finished.Factory(this, archivedExecutionGraph, LOG));
    }

    @Override
    public void goToCreatingExecutionGraph(@Nullable ExecutionGraph previousExecutionGraph) {
        CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> executionGraphWithAvailableResourcesFuture = this.createExecutionGraphWithAvailableResourcesAsync(previousExecutionGraph);
        this.transitionToState(new CreatingExecutionGraph.Factory(this, executionGraphWithAvailableResourcesFuture, LOG, previousExecutionGraph));
    }

    private CompletableFuture<CreatingExecutionGraph.ExecutionGraphWithVertexParallelism> createExecutionGraphWithAvailableResourcesAsync(@Nullable ExecutionGraph previousExecutionGraph) {
        VertexParallelismStore adjustedParallelismStore;
        JobSchedulingPlan schedulingPlan;
        try {
            schedulingPlan = this.determineParallelism(this.slotAllocator, previousExecutionGraph);
            JobGraph adjustedJobGraph = this.jobInformation.copyJobGraph();
            for (JobVertex vertex2 : adjustedJobGraph.getVertices()) {
                JobVertexID id = vertex2.getID();
                vertex2.setParallelism(schedulingPlan.getVertexParallelism().getParallelism(id));
            }
            adjustedParallelismStore = AdaptiveScheduler.computeVertexParallelismStoreForExecution(adjustedJobGraph, this.settings.getExecutionMode(), vertex -> {
                VertexParallelismInformation vertexParallelismInfo = this.initialParallelismStore.getParallelismInfo(vertex.getID());
                return vertexParallelismInfo.getMaxParallelism();
            });
        }
        catch (Exception exception) {
            return FutureUtils.completedExceptionally(exception);
        }
        return this.createExecutionGraphAndRestoreStateAsync(adjustedParallelismStore).thenApply(executionGraph -> CreatingExecutionGraph.ExecutionGraphWithVertexParallelism.create(executionGraph, schedulingPlan));
    }

    @Override
    public CreatingExecutionGraph.AssignmentResult tryToAssignSlots(CreatingExecutionGraph.ExecutionGraphWithVertexParallelism executionGraphWithVertexParallelism) {
        ExecutionGraph executionGraph = executionGraphWithVertexParallelism.getExecutionGraph();
        executionGraph.start(this.componentMainThreadExecutor);
        executionGraph.transitionToRunning();
        executionGraph.setInternalTaskFailuresListener(new UpdateSchedulerNgOnInternalFailuresListener(this));
        JobSchedulingPlan jobSchedulingPlan = executionGraphWithVertexParallelism.getJobSchedulingPlan();
        return this.slotAllocator.tryReserveResources(jobSchedulingPlan).map(reservedSlots -> this.assignSlotsToExecutionGraph(executionGraph, (ReservedSlots)reservedSlots)).map(CreatingExecutionGraph.AssignmentResult::success).orElseGet(CreatingExecutionGraph.AssignmentResult::notPossible);
    }

    @Nonnull
    private ExecutionGraph assignSlotsToExecutionGraph(ExecutionGraph executionGraph, ReservedSlots reservedSlots) {
        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
            LogicalSlot assignedSlot = reservedSlots.getSlotFor(executionVertex.getID());
            CompletableFuture<Void> registrationFuture = executionVertex.getCurrentExecutionAttempt().registerProducedPartitions(assignedSlot.getTaskManagerLocation());
            Preconditions.checkState(registrationFuture.isDone(), "Partition registration must be completed immediately for reactive mode");
            executionVertex.tryAssignResource(assignedSlot);
        }
        return executionGraph;
    }

    private CompletableFuture<ExecutionGraph> createExecutionGraphAndRestoreStateAsync(VertexParallelismStore adjustedParallelismStore) {
        this.backgroundTask.abort();
        this.backgroundTask = this.backgroundTask.runAfter(() -> this.createExecutionGraphAndRestoreState(adjustedParallelismStore), this.ioExecutor);
        return FutureUtils.switchExecutor(this.backgroundTask.getResultFuture(), (Executor)this.getMainThreadExecutor());
    }

    @Nonnull
    private ExecutionGraph createExecutionGraphAndRestoreState(VertexParallelismStore adjustedParallelismStore) throws Exception {
        return this.executionGraphFactory.createAndRestoreExecutionGraph(this.jobInformation.copyJobGraph(), this.completedCheckpointStore, this.checkpointsCleaner, this.checkpointIdCounter, TaskDeploymentDescriptorFactory.PartitionLocationConstraint.MUST_BE_KNOWN, this.initializationTimestamp, this.vertexAttemptNumberStore, adjustedParallelismStore, this.deploymentTimeMetrics, rp -> false, LOG);
    }

    @Override
    public boolean shouldRescale(ExecutionGraph executionGraph, boolean forceRescale) {
        Optional<VertexParallelism> maybeNewParallelism = this.slotAllocator.determineParallelism(this.jobInformation, this.declarativeSlotPool.getAllSlotsInformation());
        return maybeNewParallelism.filter(vertexParallelism -> {
            RescalingController rescalingControllerToUse = forceRescale ? this.forceRescalingController : this.rescalingController;
            return rescalingControllerToUse.shouldRescale(AdaptiveScheduler.getCurrentParallelism(executionGraph), (VertexParallelism)vertexParallelism);
        }).isPresent();
    }

    private static VertexParallelism getCurrentParallelism(ExecutionGraph executionGraph) {
        return new VertexParallelism(executionGraph.getAllVertices().values().stream().collect(Collectors.toMap(ExecutionJobVertex::getJobVertexId, ExecutionJobVertex::getParallelism)));
    }

    @Override
    public void onFinished(ArchivedExecutionGraph archivedExecutionGraph) {
        SerializedThrowable optionalFailure = archivedExecutionGraph.getFailureInfo() != null ? archivedExecutionGraph.getFailureInfo().getException() : null;
        LOG.info("Job {} reached terminal state {}.", new Object[]{archivedExecutionGraph.getJobID(), archivedExecutionGraph.getState(), optionalFailure});
        this.jobTerminationFuture.complete(archivedExecutionGraph.getState());
    }

    @Override
    public FailureResult howToHandleFailure(Throwable failure) {
        if (ExecutionFailureHandler.isUnrecoverableError(failure)) {
            return FailureResult.canNotRestart(new JobException("The failure is not recoverable", failure));
        }
        this.restartBackoffTimeStrategy.notifyFailure(failure);
        if (this.restartBackoffTimeStrategy.canRestart()) {
            return FailureResult.canRestart(failure, Duration.ofMillis(this.restartBackoffTimeStrategy.getBackoffTime()));
        }
        return FailureResult.canNotRestart(new JobException("Recovery is suppressed by " + this.restartBackoffTimeStrategy, failure));
    }

    @Override
    public Executor getIOExecutor() {
        return this.ioExecutor;
    }

    @Override
    public ComponentMainThreadExecutor getMainThreadExecutor() {
        return this.componentMainThreadExecutor;
    }

    @Override
    public JobManagerJobMetricGroup getMetricGroup() {
        return this.jobManagerJobMetricGroup;
    }

    @Override
    public boolean isState(State expectedState) {
        return expectedState == this.state;
    }

    @Override
    public void runIfState(State expectedState, Runnable action) {
        if (this.isState(expectedState)) {
            try {
                action.run();
            }
            catch (Throwable t) {
                this.fatalErrorHandler.onFatalError(t);
            }
        } else {
            LOG.debug("Ignoring scheduled action because expected state {} is not the actual state {}.", (Object)expectedState, (Object)this.state);
        }
    }

    @Override
    public ScheduledFuture<?> runIfState(State expectedState, Runnable action, Duration delay) {
        return this.componentMainThreadExecutor.schedule(() -> this.runIfState(expectedState, action), delay.toMillis(), TimeUnit.MILLISECONDS);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    <T extends State> T transitionToState(StateFactory<T> targetState) {
        Preconditions.checkState(!this.isTransitioningState, "State transitions must not be triggered while another state transition is in progress.");
        Preconditions.checkState(this.state.getClass() != targetState.getStateClass(), "Attempted to transition into the very state the scheduler is already in.");
        this.componentMainThreadExecutor.assertRunningInMainThread();
        try {
            this.isTransitioningState = true;
            LOG.debug("Transition from state {} to {}.", (Object)this.state.getClass().getSimpleName(), (Object)targetState.getStateClass().getSimpleName());
            JobStatus previousJobStatus = this.state.getJobStatus();
            this.state.onLeave(targetState.getStateClass());
            T targetStateInstance = targetState.getState();
            this.state = targetStateInstance;
            JobStatus newJobStatus = this.state.getJobStatus();
            if (previousJobStatus != newJobStatus) {
                long timestamp = System.currentTimeMillis();
                this.jobStatusListeners.forEach(listener -> listener.jobStatusChanges(this.jobInformation.getJobID(), newJobStatus, timestamp));
            }
            T t = targetStateInstance;
            return t;
        }
        finally {
            this.isTransitioningState = false;
        }
    }

    @VisibleForTesting
    State getState() {
        return this.state;
    }

    private void checkIdleSlotTimeout() {
        if (this.getState().getJobStatus().isGloballyTerminalState()) {
            for (SlotInfo slotInfo : this.declarativeSlotPool.getAllSlotsInformation()) {
                this.declarativeSlotPool.releaseSlot(slotInfo.getAllocationId(), new FlinkException("Returning slots to their owners, because the job has reached a globally terminal state."));
            }
            return;
        }
        if (this.getState().getJobStatus().isTerminalState()) {
            return;
        }
        this.declarativeSlotPool.releaseIdleSlots(System.currentTimeMillis());
        this.getMainThreadExecutor().schedule(this::checkIdleSlotTimeout, this.settings.getSlotIdleTimeout().toMillis(), TimeUnit.MILLISECONDS);
    }

    public static class Settings {
        private final SchedulerExecutionMode executionMode;
        private final Duration initialResourceAllocationTimeout;
        private final Duration resourceStabilizationTimeout;
        private final Duration slotIdleTimeout;
        private final Duration scalingIntervalMin;
        private final Duration scalingIntervalMax;

        public static Settings of(Configuration configuration) {
            SchedulerExecutionMode executionMode = configuration.get(JobManagerOptions.SCHEDULER_MODE);
            Duration allocationTimeoutDefault = JobManagerOptions.RESOURCE_WAIT_TIMEOUT.defaultValue();
            Duration stabilizationTimeoutDefault = JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT.defaultValue();
            if (executionMode == SchedulerExecutionMode.REACTIVE) {
                allocationTimeoutDefault = Duration.ofMillis(-1L);
                stabilizationTimeoutDefault = Duration.ZERO;
            }
            Duration scalingIntervalMin = configuration.get(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN);
            Duration scalingIntervalMax = configuration.get(JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX);
            Preconditions.checkState(!scalingIntervalMin.isNegative(), "%s must be positive integer or 0", JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key());
            if (scalingIntervalMax != null) {
                Preconditions.checkState(scalingIntervalMax.compareTo(scalingIntervalMin) > 0, "%s(%d) must be greater than %s(%d)", JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MAX.key(), scalingIntervalMax, JobManagerOptions.SCHEDULER_SCALING_INTERVAL_MIN.key(), scalingIntervalMin);
            }
            return new Settings(executionMode, configuration.getOptional(JobManagerOptions.RESOURCE_WAIT_TIMEOUT).orElse(allocationTimeoutDefault), configuration.getOptional(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT).orElse(stabilizationTimeoutDefault), Duration.ofMillis(configuration.get(JobManagerOptions.SLOT_IDLE_TIMEOUT)), scalingIntervalMin, scalingIntervalMax);
        }

        private Settings(SchedulerExecutionMode executionMode, Duration initialResourceAllocationTimeout, Duration resourceStabilizationTimeout, Duration slotIdleTimeout, Duration scalingIntervalMin, Duration scalingIntervalMax) {
            this.executionMode = executionMode;
            this.initialResourceAllocationTimeout = initialResourceAllocationTimeout;
            this.resourceStabilizationTimeout = resourceStabilizationTimeout;
            this.slotIdleTimeout = slotIdleTimeout;
            this.scalingIntervalMin = scalingIntervalMin;
            this.scalingIntervalMax = scalingIntervalMax;
        }

        public SchedulerExecutionMode getExecutionMode() {
            return this.executionMode;
        }

        public Duration getInitialResourceAllocationTimeout() {
            return this.initialResourceAllocationTimeout;
        }

        public Duration getResourceStabilizationTimeout() {
            return this.resourceStabilizationTimeout;
        }

        public Duration getSlotIdleTimeout() {
            return this.slotIdleTimeout;
        }

        public Duration getScalingIntervalMin() {
            return this.scalingIntervalMin;
        }

        public Duration getScalingIntervalMax() {
            return this.scalingIntervalMax;
        }
    }
}

