/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager.slotmanager;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.blocklist.BlockedTaskManagerChecker;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.resourcemanager.slotmanager.PendingTaskManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocationResult;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceAllocationStrategy;
import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceReconcileResult;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerUtils;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerInfo;
import org.apache.flink.runtime.resourcemanager.slotmanager.TaskManagerResourceInfoProvider;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.util.Preconditions;

public class DefaultResourceAllocationStrategy
implements ResourceAllocationStrategy {
    private final ResourceProfile defaultSlotResourceProfile;
    private final ResourceProfile totalResourceProfile;
    private final int numSlotsPerWorker;
    private final CPUResource minTotalCPU;
    private final MemorySize minTotalMemory;
    private final ResourceMatchingStrategy availableResourceMatchingStrategy;
    private final ResourceMatchingStrategy pendingResourceMatchingStrategy = AnyMatchingResourceMatchingStrategy.INSTANCE;
    private final Time taskManagerTimeout;
    private final int redundantTaskManagerNum;

    public DefaultResourceAllocationStrategy(ResourceProfile totalResourceProfile, int numSlotsPerWorker, TaskManagerOptions.TaskManagerLoadBalanceMode taskManagerLoadBalanceMode, Time taskManagerTimeout, int redundantTaskManagerNum, CPUResource minTotalCPU, MemorySize minTotalMemory) {
        this.totalResourceProfile = totalResourceProfile;
        this.numSlotsPerWorker = numSlotsPerWorker;
        this.defaultSlotResourceProfile = SlotManagerUtils.generateDefaultSlotResourceProfile(totalResourceProfile, numSlotsPerWorker);
        this.availableResourceMatchingStrategy = taskManagerLoadBalanceMode == TaskManagerOptions.TaskManagerLoadBalanceMode.SLOTS ? LeastUtilizationResourceMatchingStrategy.INSTANCE : AnyMatchingResourceMatchingStrategy.INSTANCE;
        this.taskManagerTimeout = taskManagerTimeout;
        this.redundantTaskManagerNum = redundantTaskManagerNum;
        this.minTotalCPU = minTotalCPU;
        this.minTotalMemory = minTotalMemory;
    }

    @Override
    public ResourceAllocationResult tryFulfillRequirements(Map<JobID, Collection<ResourceRequirement>> missingResources, TaskManagerResourceInfoProvider taskManagerResourceInfoProvider, BlockedTaskManagerChecker blockedTaskManagerChecker) {
        ResourceAllocationResult.Builder resultBuilder = ResourceAllocationResult.builder();
        List<InternalResourceInfo> registeredResources = DefaultResourceAllocationStrategy.getAvailableResources(taskManagerResourceInfoProvider, resultBuilder, blockedTaskManagerChecker);
        List<InternalResourceInfo> pendingResources = DefaultResourceAllocationStrategy.getPendingResources(taskManagerResourceInfoProvider, resultBuilder);
        ResourceProfile totalCurrentResources = Stream.concat(registeredResources.stream(), pendingResources.stream()).map(internalResourceInfo -> ((InternalResourceInfo)internalResourceInfo).totalProfile).reduce(ResourceProfile.ZERO, ResourceProfile::merge);
        for (Map.Entry<JobID, Collection<ResourceRequirement>> resourceRequirements : missingResources.entrySet()) {
            JobID jobId = resourceRequirements.getKey();
            Collection<ResourceRequirement> unfulfilledJobRequirements = this.tryFulfillRequirementsForJobWithResources(jobId, resourceRequirements.getValue(), registeredResources);
            if (unfulfilledJobRequirements.isEmpty()) continue;
            totalCurrentResources = totalCurrentResources.merge(this.tryFulfillRequirementsForJobWithPendingResources(jobId, unfulfilledJobRequirements, pendingResources, resultBuilder));
        }
        this.tryFulFillRequiredResources(registeredResources, pendingResources, totalCurrentResources, resultBuilder);
        return resultBuilder.build();
    }

    @Override
    public ResourceReconcileResult tryReconcileClusterResources(TaskManagerResourceInfoProvider taskManagerResourceInfoProvider) {
        ResourceReconcileResult.Builder builder = ResourceReconcileResult.builder();
        ArrayList taskManagersIdleTimeout = new ArrayList();
        ArrayList<TaskManagerInfo> taskManagersNonTimeout = new ArrayList<TaskManagerInfo>();
        long currentTime = System.currentTimeMillis();
        taskManagerResourceInfoProvider.getRegisteredTaskManagers().forEach(taskManagerInfo -> {
            if (taskManagerInfo.isIdle() && currentTime - taskManagerInfo.getIdleSince() >= this.taskManagerTimeout.toMilliseconds()) {
                taskManagersIdleTimeout.add(taskManagerInfo);
            } else {
                taskManagersNonTimeout.add((TaskManagerInfo)taskManagerInfo);
            }
        });
        ArrayList pendingTaskManagersNonUse = new ArrayList();
        ArrayList<PendingTaskManager> pendingTaskManagersInuse = new ArrayList<PendingTaskManager>();
        taskManagerResourceInfoProvider.getPendingTaskManagers().forEach(pendingTaskManager -> {
            if (pendingTaskManager.getPendingSlotAllocationRecords().isEmpty()) {
                pendingTaskManagersNonUse.add(pendingTaskManager);
            } else {
                pendingTaskManagersInuse.add((PendingTaskManager)pendingTaskManager);
            }
        });
        ResourceProfile resourcesToKeep = ResourceProfile.ZERO;
        ResourceProfile resourcesInTotal = ResourceProfile.ZERO;
        boolean resourceFulfilled = false;
        ResourceProfile resourcesAvailableOfNonIdle = this.getAvailableResourceOfTaskManagers(taskManagersNonTimeout);
        ResourceProfile resourcesInTotalOfNonIdle = this.getTotalResourceOfTaskManagers(taskManagersNonTimeout);
        resourcesToKeep = resourcesToKeep.merge(resourcesAvailableOfNonIdle);
        resourcesInTotal = resourcesInTotal.merge(resourcesInTotalOfNonIdle);
        if (this.isRequiredResourcesFulfilled(resourcesToKeep, resourcesInTotal)) {
            resourceFulfilled = true;
        } else {
            ResourceProfile resourcesAvailableOfNonIdlePendingTaskManager = this.getAvailableResourceOfPendingTaskManagers(pendingTaskManagersInuse);
            ResourceProfile resourcesInTotalOfNonIdlePendingTaskManager = this.getTotalResourceOfPendingTaskManagers(pendingTaskManagersInuse);
            resourcesToKeep = resourcesToKeep.merge(resourcesAvailableOfNonIdlePendingTaskManager);
            resourcesInTotal = resourcesInTotal.merge(resourcesInTotalOfNonIdlePendingTaskManager);
        }
        for (TaskManagerInfo taskManagerInfo2 : taskManagersIdleTimeout) {
            if (resourceFulfilled || this.isRequiredResourcesFulfilled(resourcesToKeep, resourcesInTotal)) {
                resourceFulfilled = true;
                builder.addTaskManagerToRelease(taskManagerInfo2);
                continue;
            }
            resourcesToKeep = resourcesToKeep.merge(taskManagerInfo2.getAvailableResource());
            resourcesInTotal = resourcesInTotal.merge(taskManagerInfo2.getTotalResource());
        }
        for (PendingTaskManager pendingTaskManager2 : pendingTaskManagersNonUse) {
            if (resourceFulfilled || this.isRequiredResourcesFulfilled(resourcesToKeep, resourcesInTotal)) {
                resourceFulfilled = true;
                builder.addPendingTaskManagerToRelease(pendingTaskManager2);
                continue;
            }
            resourcesToKeep = resourcesToKeep.merge(pendingTaskManager2.getUnusedResource());
            resourcesInTotal = resourcesInTotal.merge(pendingTaskManager2.getTotalResourceProfile());
        }
        if (!resourceFulfilled) {
            this.tryFulFillRequiredResourcesWithAction(resourcesToKeep, resourcesInTotal, builder::addPendingTaskManagerToAllocate);
        }
        return builder.build();
    }

    private static List<InternalResourceInfo> getAvailableResources(TaskManagerResourceInfoProvider taskManagerResourceInfoProvider, ResourceAllocationResult.Builder resultBuilder, BlockedTaskManagerChecker blockedTaskManagerChecker) {
        return taskManagerResourceInfoProvider.getRegisteredTaskManagers().stream().filter(taskManager -> !blockedTaskManagerChecker.isBlockedTaskManager(taskManager.getTaskExecutorConnection().getResourceID())).map(taskManager -> new InternalResourceInfo(taskManager.getDefaultSlotResourceProfile(), taskManager.getTotalResource(), taskManager.getAvailableResource(), (jobId, slotProfile) -> resultBuilder.addAllocationOnRegisteredResource((JobID)jobId, taskManager.getInstanceId(), (ResourceProfile)slotProfile))).collect(Collectors.toList());
    }

    private static List<InternalResourceInfo> getPendingResources(TaskManagerResourceInfoProvider taskManagerResourceInfoProvider, ResourceAllocationResult.Builder resultBuilder) {
        return taskManagerResourceInfoProvider.getPendingTaskManagers().stream().map(pendingTaskManager -> new InternalResourceInfo(pendingTaskManager.getDefaultSlotResourceProfile(), pendingTaskManager.getTotalResourceProfile(), pendingTaskManager.getTotalResourceProfile(), (jobId, slotProfile) -> resultBuilder.addAllocationOnPendingResource((JobID)jobId, pendingTaskManager.getPendingTaskManagerId(), (ResourceProfile)slotProfile))).collect(Collectors.toList());
    }

    private Collection<ResourceRequirement> tryFulfillRequirementsForJobWithResources(JobID jobId, Collection<ResourceRequirement> missingResources, List<InternalResourceInfo> registeredResources) {
        ArrayList<ResourceRequirement> outstandingRequirements = new ArrayList<ResourceRequirement>();
        for (ResourceRequirement resourceRequirement : missingResources) {
            int numMissingRequirements = this.availableResourceMatchingStrategy.tryFulfilledRequirementWithResource(registeredResources, resourceRequirement.getNumberOfRequiredSlots(), resourceRequirement.getResourceProfile(), jobId);
            if (numMissingRequirements <= 0) continue;
            outstandingRequirements.add(ResourceRequirement.create(resourceRequirement.getResourceProfile(), numMissingRequirements));
        }
        return outstandingRequirements;
    }

    private static boolean canFulfillRequirement(ResourceProfile requirement, ResourceProfile resourceProfile) {
        return resourceProfile.allFieldsNoLessThan(requirement);
    }

    private ResourceProfile tryFulfillRequirementsForJobWithPendingResources(JobID jobId, Collection<ResourceRequirement> unfulfilledRequirements, List<InternalResourceInfo> availableResources, ResourceAllocationResult.Builder resultBuilder) {
        ResourceProfile newAddedResourceProfile = ResourceProfile.ZERO;
        for (ResourceRequirement missingResource : unfulfilledRequirements) {
            ResourceProfile effectiveProfile = SlotManagerUtils.getEffectiveResourceProfile(missingResource.getResourceProfile(), this.defaultSlotResourceProfile);
            int numUnfulfilled = this.pendingResourceMatchingStrategy.tryFulfilledRequirementWithResource(availableResources, missingResource.getNumberOfRequiredSlots(), missingResource.getResourceProfile(), jobId);
            if (!this.totalResourceProfile.allFieldsNoLessThan(effectiveProfile)) {
                resultBuilder.addUnfulfillableJob(jobId);
                continue;
            }
            while (numUnfulfilled > 0) {
                PendingTaskManager newPendingTaskManager = new PendingTaskManager(this.totalResourceProfile, this.numSlotsPerWorker);
                resultBuilder.addPendingTaskManagerAllocate(newPendingTaskManager);
                newAddedResourceProfile = newAddedResourceProfile.merge(this.totalResourceProfile);
                ResourceProfile remainResource = this.totalResourceProfile;
                while (numUnfulfilled > 0 && DefaultResourceAllocationStrategy.canFulfillRequirement(effectiveProfile, remainResource)) {
                    --numUnfulfilled;
                    resultBuilder.addAllocationOnPendingResource(jobId, newPendingTaskManager.getPendingTaskManagerId(), effectiveProfile);
                    remainResource = remainResource.subtract(effectiveProfile);
                }
                if (remainResource.equals(ResourceProfile.ZERO)) continue;
                availableResources.add(new InternalResourceInfo(this.defaultSlotResourceProfile, this.totalResourceProfile, remainResource, (jobID, slotProfile) -> resultBuilder.addAllocationOnPendingResource((JobID)jobID, newPendingTaskManager.getPendingTaskManagerId(), (ResourceProfile)slotProfile)));
            }
        }
        return newAddedResourceProfile;
    }

    private boolean isRequiredResourcesFulfilled(ResourceProfile resourcesAvailable, ResourceProfile resourcesInTotal) {
        return this.isRedundantResourcesFulfilled(resourcesAvailable) && this.isMinRequiredResourcesFulfilled(resourcesInTotal);
    }

    private boolean isRedundantResourcesFulfilled(ResourceProfile resourcesAvailable) {
        return resourcesAvailable.allFieldsNoLessThan(this.totalResourceProfile.multiply(this.redundantTaskManagerNum));
    }

    private boolean isMinRequiredResourcesFulfilled(ResourceProfile resourcesInTotal) {
        return resourcesInTotal.getCpuCores().compareTo(this.minTotalCPU) >= 0 && resourcesInTotal.getTotalMemory().compareTo(this.minTotalMemory) >= 0;
    }

    private void tryFulFillRequiredResources(List<InternalResourceInfo> availableRegisteredResources, List<InternalResourceInfo> availablePendingResources, ResourceProfile resourcesInTotal, ResourceAllocationResult.Builder resultBuilder) {
        ResourceProfile resourcesAvailable = Stream.concat(availableRegisteredResources.stream(), availablePendingResources.stream()).map(internalResourceInfo -> ((InternalResourceInfo)internalResourceInfo).availableProfile).reduce(ResourceProfile.ZERO, ResourceProfile::merge);
        this.tryFulFillRequiredResourcesWithAction(resourcesAvailable, resourcesInTotal, resultBuilder::addPendingTaskManagerAllocate);
    }

    private void tryFulFillRequiredResourcesWithAction(ResourceProfile resourcesAvailable, ResourceProfile resourcesInTotal, Consumer<? super PendingTaskManager> fulfillAction) {
        while (!this.isRequiredResourcesFulfilled(resourcesAvailable, resourcesInTotal)) {
            PendingTaskManager pendingTaskManager = new PendingTaskManager(this.totalResourceProfile, this.numSlotsPerWorker);
            fulfillAction.accept(pendingTaskManager);
            resourcesAvailable = resourcesAvailable.merge(this.totalResourceProfile);
            resourcesInTotal = resourcesInTotal.merge(this.totalResourceProfile);
        }
    }

    private ResourceProfile getTotalResourceOfTaskManagers(List<TaskManagerInfo> taskManagers) {
        return this.totalResourceProfile.multiply(taskManagers.size());
    }

    private ResourceProfile getTotalResourceOfPendingTaskManagers(List<PendingTaskManager> pendingTaskManagers) {
        return this.totalResourceProfile.multiply(pendingTaskManagers.size());
    }

    private ResourceProfile getAvailableResourceOfTaskManagers(List<TaskManagerInfo> taskManagers) {
        return taskManagers.stream().map(TaskManagerInfo::getAvailableResource).reduce(ResourceProfile.ZERO, ResourceProfile::merge);
    }

    private ResourceProfile getAvailableResourceOfPendingTaskManagers(List<PendingTaskManager> pendingTaskManagers) {
        return pendingTaskManagers.stream().map(PendingTaskManager::getUnusedResource).reduce(ResourceProfile.ZERO, ResourceProfile::merge);
    }

    private static enum LeastUtilizationResourceMatchingStrategy implements ResourceMatchingStrategy
    {
        INSTANCE;


        @Override
        public int tryFulfilledRequirementWithResource(List<InternalResourceInfo> internalResources, int numUnfulfilled, ResourceProfile requiredResource, JobID jobId) {
            if (internalResources.isEmpty()) {
                return numUnfulfilled;
            }
            PriorityQueue<InternalResourceInfo> resourceInfoInUtilizationOrder = new PriorityQueue<InternalResourceInfo>(internalResources.size(), Comparator.comparingDouble(i -> ((InternalResourceInfo)i).utilization));
            resourceInfoInUtilizationOrder.addAll(internalResources);
            while (numUnfulfilled > 0 && !resourceInfoInUtilizationOrder.isEmpty()) {
                InternalResourceInfo currentTaskManager = (InternalResourceInfo)resourceInfoInUtilizationOrder.poll();
                if (!currentTaskManager.tryAllocateSlotForJob(jobId, requiredResource)) continue;
                --numUnfulfilled;
                if (currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) continue;
                resourceInfoInUtilizationOrder.add(currentTaskManager);
            }
            return numUnfulfilled;
        }
    }

    private static enum AnyMatchingResourceMatchingStrategy implements ResourceMatchingStrategy
    {
        INSTANCE;


        @Override
        public int tryFulfilledRequirementWithResource(List<InternalResourceInfo> internalResources, int numUnfulfilled, ResourceProfile requiredResource, JobID jobId) {
            Iterator<InternalResourceInfo> internalResourceInfoItr = internalResources.iterator();
            while (numUnfulfilled > 0 && internalResourceInfoItr.hasNext()) {
                InternalResourceInfo currentTaskManager = internalResourceInfoItr.next();
                while (numUnfulfilled > 0 && currentTaskManager.tryAllocateSlotForJob(jobId, requiredResource)) {
                    --numUnfulfilled;
                }
                if (!currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) continue;
                internalResourceInfoItr.remove();
            }
            return numUnfulfilled;
        }
    }

    private static interface ResourceMatchingStrategy {
        public int tryFulfilledRequirementWithResource(List<InternalResourceInfo> var1, int var2, ResourceProfile var3, JobID var4);
    }

    private static class InternalResourceInfo {
        private final ResourceProfile defaultSlotProfile;
        private final BiConsumer<JobID, ResourceProfile> allocationConsumer;
        private final ResourceProfile totalProfile;
        private ResourceProfile availableProfile;
        private double utilization;

        InternalResourceInfo(ResourceProfile defaultSlotProfile, ResourceProfile totalProfile, ResourceProfile availableProfile, BiConsumer<JobID, ResourceProfile> allocationConsumer) {
            Preconditions.checkState(!defaultSlotProfile.equals(ResourceProfile.UNKNOWN));
            Preconditions.checkState(!totalProfile.equals(ResourceProfile.UNKNOWN));
            Preconditions.checkState(!availableProfile.equals(ResourceProfile.UNKNOWN));
            this.defaultSlotProfile = defaultSlotProfile;
            this.totalProfile = totalProfile;
            this.availableProfile = availableProfile;
            this.allocationConsumer = allocationConsumer;
            this.utilization = this.updateUtilization();
        }

        boolean tryAllocateSlotForJob(JobID jobId, ResourceProfile requirement) {
            ResourceProfile effectiveProfile = SlotManagerUtils.getEffectiveResourceProfile(requirement, this.defaultSlotProfile);
            if (this.availableProfile.allFieldsNoLessThan(effectiveProfile)) {
                this.availableProfile = this.availableProfile.subtract(effectiveProfile);
                this.allocationConsumer.accept(jobId, effectiveProfile);
                this.utilization = this.updateUtilization();
                return true;
            }
            return false;
        }

        private double updateUtilization() {
            double cpuUtilization = this.totalProfile.getCpuCores().subtract(this.availableProfile.getCpuCores()).getValue().doubleValue() / this.totalProfile.getCpuCores().getValue().doubleValue();
            double memoryUtilization = (double)this.totalProfile.getTotalMemory().subtract(this.availableProfile.getTotalMemory()).getBytes() / (double)this.totalProfile.getTotalMemory().getBytes();
            return Math.max(cpuUtilization, memoryUtilization);
        }
    }
}

