/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;

public class ProportionalCapacityPreemptionPolicy
implements SchedulingEditPolicy {
    private static final Log LOG = LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class);
    public static final String OBSERVE_ONLY = "yarn.resourcemanager.monitor.capacity.preemption.observe_only";
    public static final String MONITORING_INTERVAL = "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval";
    public static final String WAIT_TIME_BEFORE_KILL = "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill";
    public static final String TOTAL_PREEMPTION_PER_ROUND = "yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round";
    public static final String MAX_IGNORED_OVER_CAPACITY = "yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity";
    public static final String NATURAL_TERMINATION_FACTOR = "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
    public EventHandler<ContainerPreemptEvent> dispatcher;
    private final Clock clock;
    private double maxIgnoredOverCapacity;
    private long maxWaitTime;
    private CapacityScheduler scheduler;
    private long monitoringInterval;
    private final Map<RMContainer, Long> preempted = new HashMap<RMContainer, Long>();
    private ResourceCalculator rc;
    private float percentageClusterPreemptionAllowed;
    private double naturalTerminationFactor;
    private boolean observeOnly;

    public ProportionalCapacityPreemptionPolicy() {
        this.clock = new SystemClock();
    }

    public ProportionalCapacityPreemptionPolicy(Configuration config, EventHandler<ContainerPreemptEvent> dispatcher, CapacityScheduler scheduler) {
        this(config, dispatcher, scheduler, (Clock)new SystemClock());
    }

    public ProportionalCapacityPreemptionPolicy(Configuration config, EventHandler<ContainerPreemptEvent> dispatcher, CapacityScheduler scheduler, Clock clock) {
        this.init(config, dispatcher, scheduler);
        this.clock = clock;
    }

    @Override
    public void init(Configuration config, EventHandler<ContainerPreemptEvent> disp, PreemptableResourceScheduler sched) {
        LOG.info((Object)("Preemption monitor:" + this.getClass().getCanonicalName()));
        assert (null == this.scheduler) : "Unexpected duplicate call to init";
        if (!(sched instanceof CapacityScheduler)) {
            throw new YarnRuntimeException("Class " + sched.getClass().getCanonicalName() + " not instance of " + CapacityScheduler.class.getCanonicalName());
        }
        this.dispatcher = disp;
        this.scheduler = (CapacityScheduler)sched;
        this.maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
        this.naturalTerminationFactor = config.getDouble(NATURAL_TERMINATION_FACTOR, 0.2);
        this.maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000L);
        this.monitoringInterval = config.getLong(MONITORING_INTERVAL, 3000L);
        this.percentageClusterPreemptionAllowed = config.getFloat(TOTAL_PREEMPTION_PER_ROUND, 0.1f);
        this.observeOnly = config.getBoolean(OBSERVE_ONLY, false);
        this.rc = this.scheduler.getResourceCalculator();
    }

    @Override
    public void editSchedule() {
        CSQueue root = this.scheduler.getRootQueue();
        Resource clusterResources = Resources.clone((Resource)this.scheduler.getClusterResources());
        this.containerBasedPreemptOrKill(root, clusterResources);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void containerBasedPreemptOrKill(CSQueue root, Resource clusterResources) {
        TempQueue tRoot;
        CapacityScheduler capacityScheduler = this.scheduler;
        synchronized (capacityScheduler) {
            tRoot = this.cloneQueues(root, clusterResources);
        }
        tRoot.idealAssigned = tRoot.guaranteed;
        Resource totalPreemptionAllowed = Resources.multiply((Resource)clusterResources, (double)this.percentageClusterPreemptionAllowed);
        List<TempQueue> queues = this.recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
        Map<ApplicationAttemptId, Set<RMContainer>> toPreempt = this.getContainersToPreempt(queues, clusterResources);
        this.logToCSV(queues);
        if (this.observeOnly) {
            return;
        }
        for (Map.Entry<ApplicationAttemptId, Set<RMContainer>> e : toPreempt.entrySet()) {
            for (RMContainer container : e.getValue()) {
                if (this.preempted.get(container) != null && this.preempted.get(container) + this.maxWaitTime < this.clock.getTime()) {
                    this.dispatcher.handle((Event)new ContainerPreemptEvent(e.getKey(), container, ContainerPreemptEventType.KILL_CONTAINER));
                    this.preempted.remove(container);
                    continue;
                }
                this.dispatcher.handle((Event)new ContainerPreemptEvent(e.getKey(), container, ContainerPreemptEventType.PREEMPT_CONTAINER));
                if (this.preempted.get(container) != null) continue;
                this.preempted.put(container, this.clock.getTime());
            }
        }
        Iterator<RMContainer> i = this.preempted.keySet().iterator();
        while (i.hasNext()) {
            RMContainer id = i.next();
            if (this.preempted.get(id) + 2L * this.maxWaitTime >= this.clock.getTime()) continue;
            i.remove();
        }
    }

    private List<TempQueue> recursivelyComputeIdealAssignment(TempQueue root, Resource totalPreemptionAllowed) {
        ArrayList<TempQueue> leafs = new ArrayList<TempQueue>();
        if (root.getChildren() != null && root.getChildren().size() > 0) {
            this.computeIdealResourceDistribution(this.rc, root.getChildren(), totalPreemptionAllowed, root.idealAssigned);
            for (TempQueue t : root.getChildren()) {
                leafs.addAll(this.recursivelyComputeIdealAssignment(t, totalPreemptionAllowed));
            }
        } else {
            return Collections.singletonList(root);
        }
        return leafs;
    }

    private void computeIdealResourceDistribution(ResourceCalculator rc, List<TempQueue> queues, Resource totalPreemptionAllowed, Resource tot_guarant) {
        ArrayList<TempQueue> qAlloc = new ArrayList<TempQueue>(queues);
        Resource unassigned = Resources.clone((Resource)tot_guarant);
        HashSet<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>();
        HashSet<TempQueue> zeroGuarQueues = new HashSet<TempQueue>();
        for (TempQueue q : qAlloc) {
            if (Resources.greaterThan((ResourceCalculator)rc, (Resource)tot_guarant, (Resource)q.guaranteed, (Resource)Resources.none())) {
                nonZeroGuarQueues.add(q);
                continue;
            }
            zeroGuarQueues.add(q);
        }
        this.computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned, false);
        if (!zeroGuarQueues.isEmpty() && Resources.greaterThan((ResourceCalculator)rc, (Resource)tot_guarant, (Resource)unassigned, (Resource)Resources.none())) {
            this.computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned, true);
        }
        Resource totPreemptionNeeded = Resource.newInstance((int)0, (int)0);
        for (TempQueue t : queues) {
            if (!Resources.greaterThan((ResourceCalculator)rc, (Resource)tot_guarant, (Resource)t.current, (Resource)t.idealAssigned)) continue;
            Resources.addTo((Resource)totPreemptionNeeded, (Resource)Resources.subtract((Resource)t.current, (Resource)t.idealAssigned));
        }
        float scalingFactor = 1.0f;
        if (Resources.greaterThan((ResourceCalculator)rc, (Resource)tot_guarant, (Resource)totPreemptionNeeded, (Resource)totalPreemptionAllowed)) {
            scalingFactor = Resources.divide((ResourceCalculator)rc, (Resource)tot_guarant, (Resource)totalPreemptionAllowed, (Resource)totPreemptionNeeded);
        }
        for (TempQueue t : queues) {
            t.assignPreemption(scalingFactor, rc, tot_guarant);
        }
        if (LOG.isDebugEnabled()) {
            long time = this.clock.getTime();
            for (TempQueue t : queues) {
                LOG.debug((Object)(time + ": " + t));
            }
        }
    }

    private void computeFixpointAllocation(ResourceCalculator rc, Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned, boolean ignoreGuarantee) {
        while (!qAlloc.isEmpty() && Resources.greaterThan((ResourceCalculator)rc, (Resource)tot_guarant, (Resource)unassigned, (Resource)Resources.none())) {
            Resource wQassigned = Resource.newInstance((int)0, (int)0);
            this.resetCapacity(rc, unassigned, qAlloc, ignoreGuarantee);
            Iterator<TempQueue> i = qAlloc.iterator();
            while (i.hasNext()) {
                Resource wQidle;
                TempQueue sub = i.next();
                Resource wQavail = Resources.multiply((Resource)unassigned, (double)sub.normalizedGuarantee);
                Resource wQdone = Resources.subtract((Resource)wQavail, (Resource)(wQidle = sub.offer(wQavail, rc, tot_guarant)));
                if (!Resources.greaterThan((ResourceCalculator)rc, (Resource)tot_guarant, (Resource)wQdone, (Resource)Resources.none())) {
                    i.remove();
                }
                Resources.addTo((Resource)wQassigned, (Resource)wQdone);
            }
            Resources.subtractFrom((Resource)unassigned, (Resource)wQassigned);
        }
    }

    private void resetCapacity(ResourceCalculator rc, Resource clusterResource, Collection<TempQueue> queues, boolean ignoreGuar) {
        Resource activeCap = Resource.newInstance((int)0, (int)0);
        if (ignoreGuar) {
            for (TempQueue q : queues) {
                q.normalizedGuarantee = 1.0f / (float)queues.size();
            }
        } else {
            for (TempQueue q : queues) {
                Resources.addTo((Resource)activeCap, (Resource)q.guaranteed);
            }
            for (TempQueue q : queues) {
                q.normalizedGuarantee = Resources.divide((ResourceCalculator)rc, (Resource)clusterResource, (Resource)q.guaranteed, (Resource)activeCap);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Map<ApplicationAttemptId, Set<RMContainer>> getContainersToPreempt(List<TempQueue> queues, Resource clusterResource) {
        HashMap<ApplicationAttemptId, Set<RMContainer>> list = new HashMap<ApplicationAttemptId, Set<RMContainer>>();
        for (TempQueue qT : queues) {
            if (!Resources.greaterThan((ResourceCalculator)this.rc, (Resource)clusterResource, (Resource)qT.current, (Resource)Resources.multiply((Resource)qT.guaranteed, (double)(1.0 + this.maxIgnoredOverCapacity)))) continue;
            Resource resToObtain = Resources.multiply((Resource)qT.toBePreempted, (double)this.naturalTerminationFactor);
            LeafQueue leafQueue = qT.leafQueue;
            synchronized (leafQueue) {
                NavigableSet ns = (NavigableSet)qT.leafQueue.getApplications();
                Iterator desc = ns.descendingIterator();
                qT.actuallyPreempted = Resources.clone((Resource)resToObtain);
                while (desc.hasNext()) {
                    FiCaSchedulerApp fc = (FiCaSchedulerApp)desc.next();
                    if (Resources.lessThanOrEqual((ResourceCalculator)this.rc, (Resource)clusterResource, (Resource)resToObtain, (Resource)Resources.none())) break;
                    list.put(fc.getApplicationAttemptId(), this.preemptFrom(fc, clusterResource, resToObtain));
                }
            }
        }
        return list;
    }

    private Set<RMContainer> preemptFrom(FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) {
        HashSet<RMContainer> ret = new HashSet<RMContainer>();
        ApplicationAttemptId appId = app.getApplicationAttemptId();
        ArrayList<RMContainer> reservations = new ArrayList<RMContainer>(app.getReservedContainers());
        for (RMContainer c : reservations) {
            if (Resources.lessThanOrEqual((ResourceCalculator)this.rc, (Resource)clusterResource, (Resource)rsrcPreempt, (Resource)Resources.none())) {
                return ret;
            }
            if (!this.observeOnly) {
                this.dispatcher.handle((Event)new ContainerPreemptEvent(appId, c, ContainerPreemptEventType.DROP_RESERVATION));
            }
            Resources.subtractFrom((Resource)rsrcPreempt, (Resource)c.getContainer().getResource());
        }
        ArrayList<RMContainer> containers = new ArrayList<RMContainer>(app.getLiveContainers());
        ProportionalCapacityPreemptionPolicy.sortContainers(containers);
        for (RMContainer c : containers) {
            if (Resources.lessThanOrEqual((ResourceCalculator)this.rc, (Resource)clusterResource, (Resource)rsrcPreempt, (Resource)Resources.none())) {
                return ret;
            }
            ret.add(c);
            Resources.subtractFrom((Resource)rsrcPreempt, (Resource)c.getContainer().getResource());
        }
        return ret;
    }

    @VisibleForTesting
    static void sortContainers(List<RMContainer> containers) {
        Collections.sort(containers, new Comparator<RMContainer>(){

            @Override
            public int compare(RMContainer a, RMContainer b) {
                Priority.Comparator c = new Priority.Comparator();
                int priorityComp = c.compare(b.getContainer().getPriority(), a.getContainer().getPriority());
                if (priorityComp != 0) {
                    return priorityComp;
                }
                return b.getContainerId().getId() - a.getContainerId().getId();
            }
        });
    }

    @Override
    public long getMonitoringInterval() {
        return this.monitoringInterval;
    }

    @Override
    public String getPolicyName() {
        return "ProportionalCapacityPreemptionPolicy";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
        TempQueue ret;
        CSQueue cSQueue = root;
        synchronized (cSQueue) {
            String queueName = root.getQueueName();
            float absUsed = root.getAbsoluteUsedCapacity();
            float absCap = root.getAbsoluteCapacity();
            float absMaxCap = root.getAbsoluteMaximumCapacity();
            Resource current = Resources.multiply((Resource)clusterResources, (double)absUsed);
            Resource guaranteed = Resources.multiply((Resource)clusterResources, (double)absCap);
            Resource maxCapacity = Resources.multiply((Resource)clusterResources, (double)absMaxCap);
            if (root instanceof LeafQueue) {
                LeafQueue l = (LeafQueue)root;
                Resource pending = l.getTotalResourcePending();
                ret = new TempQueue(queueName, current, pending, guaranteed, maxCapacity);
                ret.setLeafQueue(l);
            } else {
                Resource pending = Resource.newInstance((int)0, (int)0);
                ret = new TempQueue(root.getQueueName(), current, pending, guaranteed, maxCapacity);
                for (CSQueue c : root.getChildQueues()) {
                    ret.addChild(this.cloneQueues(c, clusterResources));
                }
            }
        }
        return ret;
    }

    private void logToCSV(List<TempQueue> unorderedqueues) {
        ArrayList<TempQueue> queues = new ArrayList<TempQueue>(unorderedqueues);
        Collections.sort(queues, new Comparator<TempQueue>(){

            @Override
            public int compare(TempQueue o1, TempQueue o2) {
                return o1.queueName.compareTo(o2.queueName);
            }
        });
        String queueState = " QUEUESTATE: " + this.clock.getTime();
        StringBuilder sb = new StringBuilder();
        sb.append(queueState);
        for (TempQueue tq : queues) {
            sb.append(", ");
            tq.appendLogString(sb);
        }
        LOG.info((Object)sb.toString());
    }

    static class TempQueue {
        final String queueName;
        final Resource current;
        final Resource pending;
        final Resource guaranteed;
        final Resource maxCapacity;
        Resource idealAssigned;
        Resource toBePreempted;
        Resource actuallyPreempted;
        double normalizedGuarantee;
        final ArrayList<TempQueue> children;
        LeafQueue leafQueue;

        TempQueue(String queueName, Resource current, Resource pending, Resource guaranteed, Resource maxCapacity) {
            this.queueName = queueName;
            this.current = current;
            this.pending = pending;
            this.guaranteed = guaranteed;
            this.maxCapacity = maxCapacity;
            this.idealAssigned = Resource.newInstance((int)0, (int)0);
            this.actuallyPreempted = Resource.newInstance((int)0, (int)0);
            this.toBePreempted = Resource.newInstance((int)0, (int)0);
            this.normalizedGuarantee = Double.NaN;
            this.children = new ArrayList();
        }

        public void setLeafQueue(LeafQueue l) {
            assert (this.children.size() == 0);
            this.leafQueue = l;
        }

        public void addChild(TempQueue q) {
            assert (this.leafQueue == null);
            this.children.add(q);
            Resources.addTo((Resource)this.pending, (Resource)q.pending);
        }

        public void addChildren(ArrayList<TempQueue> queues) {
            assert (this.leafQueue == null);
            this.children.addAll(queues);
        }

        public ArrayList<TempQueue> getChildren() {
            return this.children;
        }

        Resource offer(Resource avail, ResourceCalculator rc, Resource clusterResource) {
            Resource accepted = Resources.min((ResourceCalculator)rc, (Resource)clusterResource, (Resource)Resources.subtract((Resource)this.maxCapacity, (Resource)this.idealAssigned), (Resource)Resources.min((ResourceCalculator)rc, (Resource)clusterResource, (Resource)avail, (Resource)Resources.subtract((Resource)Resources.add((Resource)this.current, (Resource)this.pending), (Resource)this.idealAssigned)));
            Resource remain = Resources.subtract((Resource)avail, (Resource)accepted);
            Resources.addTo((Resource)this.idealAssigned, (Resource)accepted);
            return remain;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append(" NAME: " + this.queueName).append(" CUR: ").append(this.current).append(" PEN: ").append(this.pending).append(" GAR: ").append(this.guaranteed).append(" NORM: ").append(this.normalizedGuarantee).append(" IDEAL_ASSIGNED: ").append(this.idealAssigned).append(" IDEAL_PREEMPT: ").append(this.toBePreempted).append(" ACTUAL_PREEMPT: ").append(this.actuallyPreempted).append("\n");
            return sb.toString();
        }

        public void assignPreemption(float scalingFactor, ResourceCalculator rc, Resource clusterResource) {
            this.toBePreempted = Resources.greaterThan((ResourceCalculator)rc, (Resource)clusterResource, (Resource)this.current, (Resource)this.idealAssigned) ? Resources.multiply((Resource)Resources.subtract((Resource)this.current, (Resource)this.idealAssigned), (double)scalingFactor) : Resource.newInstance((int)0, (int)0);
        }

        void appendLogString(StringBuilder sb) {
            sb.append(this.queueName).append(", ").append(this.current.getMemory()).append(", ").append(this.current.getVirtualCores()).append(", ").append(this.pending.getMemory()).append(", ").append(this.pending.getVirtualCores()).append(", ").append(this.guaranteed.getMemory()).append(", ").append(this.guaranteed.getVirtualCores()).append(", ").append(this.idealAssigned.getMemory()).append(", ").append(this.idealAssigned.getVirtualCores()).append(", ").append(this.toBePreempted.getMemory()).append(", ").append(this.toBePreempted.getVirtualCores()).append(", ").append(this.actuallyPreempted.getMemory()).append(", ").append(this.actuallyPreempted.getVirtualCores());
        }
    }
}

