/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.federation.policies.amrmproxy;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyInitializationContext;
import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils;
import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.AbstractAMRMProxyPolicy;
import org.apache.hadoop.yarn.server.federation.policies.dao.WeightedPolicyInfo;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.FederationPolicyInitializationException;
import org.apache.hadoop.yarn.server.federation.policies.exceptions.NoActiveSubclustersException;
import org.apache.hadoop.yarn.server.federation.resolver.SubClusterResolver;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterIdInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalityMulticastAMRMProxyPolicy
extends AbstractAMRMProxyPolicy {
    public static final Logger LOG = LoggerFactory.getLogger(LocalityMulticastAMRMProxyPolicy.class);
    private static Random rand = new Random();
    private Map<SubClusterId, Float> weights;
    private SubClusterResolver resolver;
    private Map<SubClusterId, Resource> headroom;
    private Map<SubClusterId, Long> lastHeartbeatTimeStamp;
    private long subClusterTimeOut;
    private float hrAlpha;
    private FederationStateStoreFacade federationFacade;
    private AllocationBookkeeper bookkeeper;
    private SubClusterId homeSubcluster;

    @Override
    public void reinitialize(FederationPolicyInitializationContext policyContext) throws FederationPolicyInitializationException {
        WeightedPolicyInfo tempPolicy = this.getPolicyInfo();
        super.reinitialize(policyContext);
        if (!this.getIsDirty()) {
            return;
        }
        HashMap<SubClusterId, Float> newWeightsConverted = new HashMap<SubClusterId, Float>();
        boolean allInactive = true;
        WeightedPolicyInfo policy = this.getPolicyInfo();
        if (policy.getAMRMPolicyWeights() != null && policy.getAMRMPolicyWeights().size() > 0) {
            for (Map.Entry<SubClusterIdInfo, Float> e : policy.getAMRMPolicyWeights().entrySet()) {
                if (e.getValue().floatValue() > 0.0f) {
                    allInactive = false;
                }
                newWeightsConverted.put(e.getKey().toId(), e.getValue());
            }
        }
        if (allInactive) {
            this.setPolicyInfo(tempPolicy);
            throw new FederationPolicyInitializationException("The weights used to configure this policy are all set to zero! (no ResourceRequest could be forwarded with this setting.)");
        }
        if (policyContext.getHomeSubcluster() == null) {
            this.setPolicyInfo(tempPolicy);
            throw new FederationPolicyInitializationException("The homeSubcluster filed in the context must be initialized to use this policy");
        }
        this.weights = newWeightsConverted;
        this.resolver = policyContext.getFederationSubclusterResolver();
        if (this.headroom == null) {
            this.headroom = new ConcurrentHashMap<SubClusterId, Resource>();
            this.lastHeartbeatTimeStamp = new ConcurrentHashMap<SubClusterId, Long>();
        }
        this.hrAlpha = policy.getHeadroomAlpha();
        this.federationFacade = policyContext.getFederationStateStoreFacade();
        this.homeSubcluster = policyContext.getHomeSubcluster();
        this.subClusterTimeOut = this.federationFacade.getConf().getLong("yarn.federation.amrmproxy.subcluster.timeout.ms", 60000L);
        if (this.subClusterTimeOut <= 0L) {
            LOG.info("{} configured to be {}, should be positive. Using default of {}.", new Object[]{"yarn.federation.amrmproxy.subcluster.timeout.ms", this.subClusterTimeOut, 60000L});
            this.subClusterTimeOut = 60000L;
        }
    }

    @Override
    public void notifyOfResponse(SubClusterId subClusterId, AllocateResponse response) throws YarnException {
        if (response.getAvailableResources() != null) {
            this.headroom.put(subClusterId, response.getAvailableResources());
            LOG.info("Subcluster {} updated with {} memory headroom", (Object)subClusterId, (Object)response.getAvailableResources().getMemorySize());
        }
        this.lastHeartbeatTimeStamp.put(subClusterId, System.currentTimeMillis());
    }

    @Override
    public Map<SubClusterId, List<ResourceRequest>> splitResourceRequests(List<ResourceRequest> resourceRequests) throws YarnException {
        this.bookkeeper = new AllocationBookkeeper();
        this.bookkeeper.reinitialize(this.federationFacade.getSubClusters(true));
        ArrayList<ResourceRequest> nonLocalizedRequests = new ArrayList<ResourceRequest>();
        SubClusterId targetId = null;
        Set<SubClusterId> targetIds = null;
        for (ResourceRequest resourceRequest : resourceRequests) {
            targetId = null;
            targetIds = null;
            if (ResourceRequest.isAnyLocation((String)resourceRequest.getResourceName())) {
                nonLocalizedRequests.add(resourceRequest);
                continue;
            }
            try {
                targetId = this.resolver.getSubClusterForNode(resourceRequest.getResourceName());
            }
            catch (YarnException yarnException) {
                // empty catch block
            }
            if (this.bookkeeper.isActiveAndEnabled(targetId)) {
                this.bookkeeper.addLocalizedNodeRR(targetId, resourceRequest);
                continue;
            }
            try {
                targetIds = this.resolver.getSubClustersForRack(resourceRequest.getResourceName());
            }
            catch (YarnException yarnException) {
                // empty catch block
            }
            if (targetIds != null && targetIds.size() > 0) {
                boolean hasActive = false;
                for (SubClusterId tid : targetIds) {
                    if (!this.bookkeeper.isActiveAndEnabled(tid)) continue;
                    this.bookkeeper.addRackRR(tid, resourceRequest);
                    hasActive = true;
                }
                if (hasActive) continue;
            }
            targetId = this.getSubClusterForUnResolvedRequest(this.bookkeeper, resourceRequest.getAllocationRequestId());
            if (LOG.isDebugEnabled()) {
                LOG.debug("ERROR resolving sub-cluster for resourceName: " + resourceRequest.getResourceName() + ", picked a random subcluster to forward:" + targetId);
            }
            if (targetIds != null && targetIds.size() > 0) {
                this.bookkeeper.addRackRR(targetId, resourceRequest);
                continue;
            }
            this.bookkeeper.addLocalizedNodeRR(targetId, resourceRequest);
        }
        this.splitAnyRequests(nonLocalizedRequests, this.bookkeeper);
        for (Map.Entry entry : this.bookkeeper.getAnswer().entrySet()) {
            if (this.lastHeartbeatTimeStamp.containsKey(entry.getKey())) continue;
            this.lastHeartbeatTimeStamp.put((SubClusterId)entry.getKey(), System.currentTimeMillis());
        }
        return this.bookkeeper.getAnswer();
    }

    protected SubClusterId getSubClusterForUnResolvedRequest(AllocationBookkeeper bookKeeper, long allocationId) {
        return bookKeeper.getSubClusterForUnResolvedRequest(allocationId);
    }

    private void splitAnyRequests(List<ResourceRequest> originalResourceRequests, AllocationBookkeeper allocationBookkeeper) throws YarnException {
        for (ResourceRequest resourceRequest : originalResourceRequests) {
            Long allocationId = resourceRequest.getAllocationRequestId();
            Set targetSubclusters = allocationBookkeeper.getSubClustersForId(allocationId) != null ? allocationBookkeeper.getSubClustersForId(allocationId) : allocationBookkeeper.getActiveAndEnabledSC();
            this.splitIndividualAny(resourceRequest, targetSubclusters, allocationBookkeeper);
        }
    }

    private void splitIndividualAny(ResourceRequest originalResourceRequest, Set<SubClusterId> targetSubclusters, AllocationBookkeeper allocationBookkeeper) throws YarnException {
        long allocationId = originalResourceRequest.getAllocationRequestId();
        int numContainer = originalResourceRequest.getNumContainers();
        if (numContainer == 0) {
            for (SubClusterId targetId : this.headroom.keySet()) {
                allocationBookkeeper.addAnyRR(targetId, originalResourceRequest);
            }
            return;
        }
        ArrayList<SubClusterId> targetSCs = new ArrayList<SubClusterId>(targetSubclusters);
        ArrayList<Float> weightsList = new ArrayList<Float>();
        for (SubClusterId targetId : targetSCs) {
            if (allocationBookkeeper.getSubClustersForId(allocationId) != null) {
                weightsList.add(Float.valueOf(this.getLocalityBasedWeighting(allocationId, targetId, allocationBookkeeper)));
                continue;
            }
            float headroomWeighting = this.getHeadroomWeighting(targetId, allocationBookkeeper);
            float policyWeighting = this.getPolicyConfigWeighting(targetId, allocationBookkeeper);
            weightsList.add(Float.valueOf(this.hrAlpha * headroomWeighting + (1.0f - this.hrAlpha) * policyWeighting));
        }
        ArrayList<Integer> containerNums = this.computeIntegerAssignment(numContainer, weightsList);
        int i = 0;
        for (SubClusterId targetId : targetSCs) {
            if (containerNums.get(i) > 0) {
                ResourceRequest out = ResourceRequest.clone((ResourceRequest)originalResourceRequest);
                out.setNumContainers(containerNums.get(i).intValue());
                if (ResourceRequest.isAnyLocation((String)out.getResourceName())) {
                    allocationBookkeeper.addAnyRR(targetId, out);
                } else {
                    allocationBookkeeper.addRackRR(targetId, out);
                }
            }
            ++i;
        }
    }

    @VisibleForTesting
    protected ArrayList<Integer> computeIntegerAssignment(int totalNum, ArrayList<Float> weightsList) throws YarnException {
        int i;
        ArrayList<Integer> ret = new ArrayList<Integer>();
        float totalWeight = 0.0f;
        float totalNumFloat = totalNum;
        if (weightsList.size() == 0) {
            return ret;
        }
        for (i = 0; i < weightsList.size(); ++i) {
            ret.add(0);
            if (!(weightsList.get(i).floatValue() > 0.0f)) continue;
            totalWeight += weightsList.get(i).floatValue();
        }
        if (totalWeight == 0.0f) {
            StringBuilder sb = new StringBuilder();
            for (Float weight : weightsList) {
                sb.append(weight + ", ");
            }
            throw new FederationPolicyException("No positive value found in weight array " + sb.toString());
        }
        int residue = totalNum;
        for (i = 0; i < weightsList.size(); ++i) {
            if (!(weightsList.get(i).floatValue() > 0.0f)) continue;
            int base = (int)(totalNumFloat * weightsList.get(i).floatValue() / totalWeight);
            ret.set(i, ret.get(i) + base);
            residue -= base;
        }
        for (i = 0; i < residue; ++i) {
            int index = FederationPolicyUtils.getWeightedRandom(weightsList);
            ret.set(index, ret.get(index) + 1);
        }
        return ret;
    }

    private float getLocalityBasedWeighting(long reqId, SubClusterId targetId, AllocationBookkeeper allocationBookkeeper) {
        float totWeight = allocationBookkeeper.getTotNumLocalizedContainers(reqId);
        float localWeight = allocationBookkeeper.getNumLocalizedContainers(reqId, targetId);
        return totWeight > 0.0f ? localWeight / totWeight : 0.0f;
    }

    private float getPolicyConfigWeighting(SubClusterId targetId, AllocationBookkeeper allocationBookkeeper) {
        float totWeight = allocationBookkeeper.totPolicyWeight;
        Float localWeight = (Float)allocationBookkeeper.policyWeights.get(targetId);
        return localWeight != null && totWeight > 0.0f ? localWeight.floatValue() / totWeight : 0.0f;
    }

    private float getHeadroomWeighting(SubClusterId targetId, AllocationBookkeeper allocationBookkeeper) {
        float headroomWeighting = 1.0f / (float)allocationBookkeeper.getActiveAndEnabledSC().size();
        if (this.headroom.containsKey(targetId) && allocationBookkeeper.totHeadroomMemory > 0.0f) {
            float ratioHeadroomKnown = (float)allocationBookkeeper.totHeadRoomEnabledRMs / (float)allocationBookkeeper.getActiveAndEnabledSC().size();
            headroomWeighting = (float)this.headroom.get(targetId).getMemorySize() / allocationBookkeeper.totHeadroomMemory * ratioHeadroomKnown;
        }
        return headroomWeighting;
    }

    protected final class AllocationBookkeeper {
        private Map<SubClusterId, List<ResourceRequest>> answer = new TreeMap<SubClusterId, List<ResourceRequest>>();
        private Map<SubClusterId, Set<Long>> maskForRackDeletion = new HashMap<SubClusterId, Set<Long>>();
        private Map<Long, Map<SubClusterId, AtomicLong>> countContainersPerRM = new HashMap<Long, Map<SubClusterId, AtomicLong>>();
        private Map<Long, AtomicLong> totNumLocalizedContainers = new HashMap<Long, AtomicLong>();
        private Map<Long, SubClusterId> unResolvedRequestLocation = new HashMap<Long, SubClusterId>();
        private Set<SubClusterId> activeAndEnabledSC = new HashSet<SubClusterId>();
        private float totHeadroomMemory = 0.0f;
        private int totHeadRoomEnabledRMs = 0;
        private Map<SubClusterId, Float> policyWeights;
        private float totPolicyWeight = 0.0f;

        protected AllocationBookkeeper() {
        }

        private void reinitialize(Map<SubClusterId, SubClusterInfo> activeSubclusters) throws YarnException {
            if (activeSubclusters == null) {
                throw new YarnRuntimeException("null activeSubclusters received");
            }
            this.answer.clear();
            this.maskForRackDeletion.clear();
            this.countContainersPerRM.clear();
            this.totNumLocalizedContainers.clear();
            this.activeAndEnabledSC.clear();
            this.totHeadroomMemory = 0.0f;
            this.totHeadRoomEnabledRMs = 0;
            this.policyWeights = LocalityMulticastAMRMProxyPolicy.this.weights;
            this.totPolicyWeight = 0.0f;
            for (Map.Entry<SubClusterId, Float> entry : this.policyWeights.entrySet()) {
                if (!(entry.getValue().floatValue() > 0.0f) || !activeSubclusters.containsKey(entry.getKey())) continue;
                this.activeAndEnabledSC.add(entry.getKey());
            }
            if (this.activeAndEnabledSC.size() < 1) {
                throw new NoActiveSubclustersException("None of the subclusters enabled in this policy (weight>0) are currently active we cannot forward the ResourceRequest(s)");
            }
            HashSet<SubClusterId> tmpSCSet = new HashSet<SubClusterId>(this.activeAndEnabledSC);
            for (Map.Entry entry : LocalityMulticastAMRMProxyPolicy.this.lastHeartbeatTimeStamp.entrySet()) {
                long duration = System.currentTimeMillis() - (Long)entry.getValue();
                if (duration <= LocalityMulticastAMRMProxyPolicy.this.subClusterTimeOut) continue;
                LOG.warn("Subcluster {} does not have a success heartbeat for {}s, skip routing asks there for this request", entry.getKey(), (Object)((double)duration / 1000.0));
                tmpSCSet.remove(entry.getKey());
            }
            if (tmpSCSet.size() < 1) {
                LOG.warn("All active and enabled subclusters have expired last heartbeat time. Ignore the expiry check for this request");
            } else {
                this.activeAndEnabledSC = tmpSCSet;
            }
            LOG.info("{} subcluster active, {} subclusters active and enabled", (Object)activeSubclusters.size(), (Object)this.activeAndEnabledSC.size());
            for (SubClusterId subClusterId : this.activeAndEnabledSC) {
                this.totPolicyWeight += this.policyWeights.get(subClusterId).floatValue();
            }
            for (Map.Entry entry : LocalityMulticastAMRMProxyPolicy.this.headroom.entrySet()) {
                if (!this.activeAndEnabledSC.contains(entry.getKey())) continue;
                this.totHeadroomMemory += (float)((Resource)entry.getValue()).getMemorySize();
                ++this.totHeadRoomEnabledRMs;
            }
        }

        private void addLocalizedNodeRR(SubClusterId targetId, ResourceRequest rr) {
            Preconditions.checkArgument((!ResourceRequest.isAnyLocation((String)rr.getResourceName()) ? 1 : 0) != 0);
            if (rr.getNumContainers() > 0) {
                if (!this.countContainersPerRM.containsKey(rr.getAllocationRequestId())) {
                    this.countContainersPerRM.put(rr.getAllocationRequestId(), new HashMap());
                }
                if (!this.countContainersPerRM.get(rr.getAllocationRequestId()).containsKey(targetId)) {
                    this.countContainersPerRM.get(rr.getAllocationRequestId()).put(targetId, new AtomicLong(0L));
                }
                this.countContainersPerRM.get(rr.getAllocationRequestId()).get(targetId).addAndGet(rr.getNumContainers());
                if (!this.totNumLocalizedContainers.containsKey(rr.getAllocationRequestId())) {
                    this.totNumLocalizedContainers.put(rr.getAllocationRequestId(), new AtomicLong(0L));
                }
                this.totNumLocalizedContainers.get(rr.getAllocationRequestId()).addAndGet(rr.getNumContainers());
            }
            this.internalAddToAnswer(targetId, rr, false);
        }

        private void addRackRR(SubClusterId targetId, ResourceRequest rr) {
            Preconditions.checkArgument((!ResourceRequest.isAnyLocation((String)rr.getResourceName()) ? 1 : 0) != 0);
            this.internalAddToAnswer(targetId, rr, true);
        }

        private void addAnyRR(SubClusterId targetId, ResourceRequest rr) {
            Preconditions.checkArgument((boolean)ResourceRequest.isAnyLocation((String)rr.getResourceName()));
            this.internalAddToAnswer(targetId, rr, false);
        }

        private void internalAddToAnswer(SubClusterId targetId, ResourceRequest partialRR, boolean isRack) {
            if (!isRack) {
                if (!this.maskForRackDeletion.containsKey(targetId)) {
                    this.maskForRackDeletion.put(targetId, new HashSet());
                }
                this.maskForRackDeletion.get(targetId).add(partialRR.getAllocationRequestId());
            }
            if (!this.answer.containsKey(targetId)) {
                this.answer.put(targetId, new ArrayList());
            }
            this.answer.get(targetId).add(partialRR);
        }

        private SubClusterId getSubClusterForUnResolvedRequest(long allocationId) {
            if (this.unResolvedRequestLocation.containsKey(allocationId)) {
                return this.unResolvedRequestLocation.get(allocationId);
            }
            int id = rand.nextInt(this.activeAndEnabledSC.size());
            for (SubClusterId subclusterId : this.activeAndEnabledSC) {
                if (id == 0) {
                    this.unResolvedRequestLocation.put(allocationId, subclusterId);
                    return subclusterId;
                }
                --id;
            }
            throw new RuntimeException("Should not be here. activeAndEnabledSC size = " + this.activeAndEnabledSC.size() + " id = " + id);
        }

        private Set<SubClusterId> getSubClustersForId(long allocationId) {
            if (this.countContainersPerRM.get(allocationId) == null) {
                return null;
            }
            return this.countContainersPerRM.get(allocationId).keySet();
        }

        private Map<SubClusterId, List<ResourceRequest>> getAnswer() {
            Iterator<Map.Entry<SubClusterId, List<ResourceRequest>>> answerIter = this.answer.entrySet().iterator();
            while (answerIter.hasNext()) {
                Map.Entry<SubClusterId, List<ResourceRequest>> entry = answerIter.next();
                SubClusterId scId = entry.getKey();
                Set<Long> mask = this.maskForRackDeletion.get(scId);
                if (mask != null) {
                    Iterator<ResourceRequest> rrIter = entry.getValue().iterator();
                    while (rrIter.hasNext()) {
                        ResourceRequest rr = rrIter.next();
                        if (mask.contains(rr.getAllocationRequestId())) continue;
                        rrIter.remove();
                    }
                }
                if (mask != null && entry.getValue().size() != 0) continue;
                answerIter.remove();
                LOG.info("removing {} from output because it has only rack RR", (Object)scId);
            }
            return this.answer;
        }

        private Set<SubClusterId> getActiveAndEnabledSC() {
            return this.activeAndEnabledSC;
        }

        private long getTotNumLocalizedContainers(long allocationId) {
            AtomicLong c = this.totNumLocalizedContainers.get(allocationId);
            return c == null ? 0L : c.get();
        }

        private long getNumLocalizedContainers(long allocationId, SubClusterId targetId) {
            AtomicLong c = this.countContainersPerRM.get(allocationId).get(targetId);
            return c == null ? 0L : c.get();
        }

        private boolean isActiveAndEnabled(SubClusterId targetId) {
            if (targetId == null) {
                return false;
            }
            return this.getActiveAndEnabledSC().contains(targetId);
        }
    }
}

