/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.sls;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.Options;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.rumen.JobTraceReader;
import org.apache.hadoop.tools.rumen.LoggedJob;
import org.apache.hadoop.tools.rumen.LoggedTask;
import org.apache.hadoop.tools.rumen.LoggedTaskAttempt;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.sls.appmaster.AMSimulator;
import org.apache.hadoop.yarn.sls.nodemanager.NMSimulator;
import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
import org.apache.hadoop.yarn.sls.scheduler.ResourceSchedulerWrapper;
import org.apache.hadoop.yarn.sls.scheduler.TaskRunner;
import org.apache.hadoop.yarn.sls.utils.SLSUtils;
import org.apache.log4j.Logger;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.map.MappingIterator;
import org.codehaus.jackson.map.ObjectMapper;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class SLSRunner {
    private ResourceManager rm;
    private static TaskRunner runner = new TaskRunner();
    private String[] inputTraces;
    private Configuration conf;
    private Map<String, Integer> queueAppNumMap;
    private HashMap<NodeId, NMSimulator> nmMap;
    private int nmMemoryMB;
    private int nmVCores;
    private String nodeFile;
    private int AM_ID;
    private Map<String, AMSimulator> amMap;
    private Set<String> trackedApps;
    private Map<String, Class> amClassMap;
    private static int remainingApps = 0;
    private String metricsOutputDir;
    private boolean printSimulation;
    private int numNMs;
    private int numRacks;
    private int numAMs;
    private int numTasks;
    private long maxRuntime;
    public static final Map<String, Object> simulateInfoMap = new HashMap<String, Object>();
    public static final Logger LOG = Logger.getLogger(SLSRunner.class);
    private boolean isSLS;

    public SLSRunner(boolean isSLS, String[] inputTraces, String nodeFile, String outputDir, Set<String> trackedApps, boolean printsimulation) throws IOException, ClassNotFoundException {
        this.isSLS = isSLS;
        this.inputTraces = (String[])inputTraces.clone();
        this.nodeFile = nodeFile;
        this.trackedApps = trackedApps;
        this.printSimulation = printsimulation;
        this.metricsOutputDir = outputDir;
        this.nmMap = new HashMap();
        this.queueAppNumMap = new HashMap<String, Integer>();
        this.amMap = new HashMap<String, AMSimulator>();
        this.amClassMap = new HashMap<String, Class>();
        this.conf = new Configuration(false);
        this.conf.addResource("sls-runner.xml");
        int poolSize = this.conf.getInt("yarn.sls.runner.pool.size", 10);
        runner.setQueueSize(poolSize);
        for (Map.Entry e : this.conf) {
            String key = e.getKey().toString();
            if (!key.startsWith("yarn.sls.am.type.")) continue;
            String amType = key.substring("yarn.sls.am.type.".length());
            this.amClassMap.put(amType, Class.forName(this.conf.get(key)));
        }
    }

    public void start() throws Exception {
        this.startRM();
        this.startNM();
        this.startAM();
        ((ResourceSchedulerWrapper)this.rm.getResourceScheduler()).setQueueSet(this.queueAppNumMap.keySet());
        ((ResourceSchedulerWrapper)this.rm.getResourceScheduler()).setTrackedAppSet(this.trackedApps);
        this.printSimulationInfo();
        this.waitForNodesRunning();
        runner.start();
    }

    private void startRM() throws IOException, ClassNotFoundException {
        YarnConfiguration rmConf = new YarnConfiguration();
        String schedulerClass = rmConf.get("yarn.resourcemanager.scheduler.class");
        rmConf.set("yarn.sls.scheduler.class", schedulerClass);
        rmConf.set("yarn.resourcemanager.scheduler.class", ResourceSchedulerWrapper.class.getName());
        rmConf.set("yarn.sls.metrics.output", this.metricsOutputDir);
        this.rm = new ResourceManager();
        this.rm.init((Configuration)rmConf);
        this.rm.start();
    }

    private void startNM() throws YarnException, IOException {
        this.nmMemoryMB = this.conf.getInt("yarn.sls.nm.memory.mb", 10240);
        this.nmVCores = this.conf.getInt("yarn.sls.nm.vcores", 10);
        int heartbeatInterval = this.conf.getInt("yarn.sls.nm.heartbeat.interval.ms", 1000);
        HashSet<String> nodeSet = new HashSet<String>();
        if (this.nodeFile.isEmpty()) {
            if (this.isSLS) {
                for (String inputTrace : this.inputTraces) {
                    nodeSet.addAll(SLSUtils.parseNodesFromSLSTrace(inputTrace));
                }
            } else {
                for (String inputTrace : this.inputTraces) {
                    nodeSet.addAll(SLSUtils.parseNodesFromRumenTrace(inputTrace));
                }
            }
        } else {
            nodeSet.addAll(SLSUtils.parseNodesFromNodeFile(this.nodeFile));
        }
        Random random = new Random();
        HashSet<String> rackSet = new HashSet<String>();
        for (String hostName : nodeSet) {
            NMSimulator nm = new NMSimulator();
            nm.init(hostName, this.nmMemoryMB, this.nmVCores, random.nextInt(heartbeatInterval), heartbeatInterval, this.rm);
            this.nmMap.put(nm.getNode().getNodeID(), nm);
            runner.schedule(nm);
            rackSet.add(nm.getNode().getRackName());
        }
        this.numRacks = rackSet.size();
        this.numNMs = this.nmMap.size();
    }

    private void waitForNodesRunning() throws InterruptedException {
        long startTimeMS = System.currentTimeMillis();
        while (true) {
            int numRunningNodes = 0;
            for (RMNode node : this.rm.getRMContext().getRMNodes().values()) {
                if (node.getState() != NodeState.RUNNING) continue;
                ++numRunningNodes;
            }
            if (numRunningNodes == this.numNMs) break;
            LOG.info((Object)MessageFormat.format("SLSRunner is waiting for all nodes RUNNING. {0} of {1} NMs initialized.", numRunningNodes, this.numNMs));
            Thread.sleep(1000L);
        }
        LOG.info((Object)MessageFormat.format("SLSRunner takes {0} ms to launch all nodes.", System.currentTimeMillis() - startTimeMS));
    }

    private void startAM() throws YarnException, IOException {
        int heartbeatInterval = this.conf.getInt("yarn.sls.am.heartbeat.interval.ms", 1000);
        int containerMemoryMB = this.conf.getInt("yarn.sls.container.memory.mb", 1024);
        int containerVCores = this.conf.getInt("yarn.sls.container.vcores", 1);
        Resource containerResource = BuilderUtils.newResource((int)containerMemoryMB, (int)containerVCores);
        if (this.isSLS) {
            this.startAMFromSLSTraces(containerResource, heartbeatInterval);
        } else {
            this.startAMFromRumenTraces(containerResource, heartbeatInterval);
        }
        remainingApps = this.numAMs = this.amMap.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startAMFromSLSTraces(Resource containerResource, int heartbeatInterval) throws IOException {
        JsonFactory jsonF = new JsonFactory();
        ObjectMapper mapper = new ObjectMapper();
        for (String inputTrace : this.inputTraces) {
            try (FileReader input = new FileReader(inputTrace);){
                MappingIterator i = mapper.readValues(jsonF.createJsonParser((Reader)input), Map.class);
                while (i.hasNext()) {
                    Map jsonJob = (Map)i.next();
                    long jobStartTime = Long.parseLong(jsonJob.get("job.start.ms").toString());
                    long jobFinishTime = Long.parseLong(jsonJob.get("job.end.ms").toString());
                    String user = (String)jsonJob.get("job.user");
                    if (user == null) {
                        user = "default";
                    }
                    String queue = jsonJob.get("job.queue.name").toString();
                    String oldAppId = jsonJob.get("job.id").toString();
                    boolean isTracked = this.trackedApps.contains(oldAppId);
                    int queueSize = this.queueAppNumMap.containsKey(queue) ? this.queueAppNumMap.get(queue) : 0;
                    this.queueAppNumMap.put(queue, ++queueSize);
                    List tasks = (List)jsonJob.get("job.tasks");
                    if (tasks == null || tasks.size() == 0) continue;
                    ArrayList<ContainerSimulator> containerList = new ArrayList<ContainerSimulator>();
                    for (Object o : tasks) {
                        Map jsonTask = (Map)o;
                        String hostname = jsonTask.get("container.host").toString();
                        long taskStart = Long.parseLong(jsonTask.get("container.start.ms").toString());
                        long taskFinish = Long.parseLong(jsonTask.get("container.end.ms").toString());
                        long lifeTime = taskFinish - taskStart;
                        int priority = Integer.parseInt(jsonTask.get("container.priority").toString());
                        String type = jsonTask.get("container.type").toString();
                        containerList.add(new ContainerSimulator(containerResource, lifeTime, hostname, priority, type));
                    }
                    String amType = jsonJob.get("am.type").toString();
                    AMSimulator amSim = (AMSimulator)ReflectionUtils.newInstance((Class)this.amClassMap.get(amType), (Configuration)new Configuration());
                    if (amSim == null) continue;
                    amSim.init(this.AM_ID++, heartbeatInterval, containerList, this.rm, this, jobStartTime, jobFinishTime, user, queue, isTracked, oldAppId);
                    runner.schedule(amSim);
                    this.maxRuntime = Math.max(this.maxRuntime, jobFinishTime);
                    this.numTasks += containerList.size();
                    this.amMap.put(oldAppId, amSim);
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void startAMFromRumenTraces(Resource containerResource, int heartbeatInterval) throws IOException {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        long baselineTimeMS = 0L;
        for (String inputTrace : this.inputTraces) {
            File fin = new File(inputTrace);
            try (JobTraceReader reader = new JobTraceReader(new Path(fin.getAbsolutePath()), conf);){
                LoggedJob job = null;
                while ((job = (LoggedJob)reader.getNext()) != null) {
                    long containerLifeTime;
                    String hostname;
                    LoggedTaskAttempt taskAttempt;
                    String jobType = "mapreduce";
                    String user = job.getUser() == null ? "default" : job.getUser().getValue();
                    String jobQueue = job.getQueue().getValue();
                    String oldJobId = job.getJobID().toString();
                    long jobStartTimeMS = job.getSubmitTime();
                    long jobFinishTimeMS = job.getFinishTime();
                    if (baselineTimeMS == 0L) {
                        baselineTimeMS = jobStartTimeMS;
                    }
                    jobFinishTimeMS -= baselineTimeMS;
                    if ((jobStartTimeMS -= baselineTimeMS) < 0L) {
                        LOG.warn((Object)("Warning: reset job " + oldJobId + " start time to 0."));
                        jobFinishTimeMS -= jobStartTimeMS;
                        jobStartTimeMS = 0L;
                    }
                    boolean isTracked = this.trackedApps.contains(oldJobId);
                    int queueSize = this.queueAppNumMap.containsKey(jobQueue) ? this.queueAppNumMap.get(jobQueue) : 0;
                    this.queueAppNumMap.put(jobQueue, ++queueSize);
                    ArrayList<ContainerSimulator> containerList = new ArrayList<ContainerSimulator>();
                    for (LoggedTask mapTask : job.getMapTasks()) {
                        taskAttempt = (LoggedTaskAttempt)mapTask.getAttempts().get(mapTask.getAttempts().size() - 1);
                        hostname = taskAttempt.getHostName().getValue();
                        containerLifeTime = taskAttempt.getFinishTime() - taskAttempt.getStartTime();
                        containerList.add(new ContainerSimulator(containerResource, containerLifeTime, hostname, 10, "map"));
                    }
                    for (LoggedTask reduceTask : job.getReduceTasks()) {
                        taskAttempt = (LoggedTaskAttempt)reduceTask.getAttempts().get(reduceTask.getAttempts().size() - 1);
                        hostname = taskAttempt.getHostName().getValue();
                        containerLifeTime = taskAttempt.getFinishTime() - taskAttempt.getStartTime();
                        containerList.add(new ContainerSimulator(containerResource, containerLifeTime, hostname, 20, "reduce"));
                    }
                    AMSimulator amSim = (AMSimulator)ReflectionUtils.newInstance((Class)this.amClassMap.get(jobType), (Configuration)conf);
                    if (amSim == null) continue;
                    amSim.init(this.AM_ID++, heartbeatInterval, containerList, this.rm, this, jobStartTimeMS, jobFinishTimeMS, user, jobQueue, isTracked, oldJobId);
                    runner.schedule(amSim);
                    this.maxRuntime = Math.max(this.maxRuntime, jobFinishTimeMS);
                    this.numTasks += containerList.size();
                    this.amMap.put(oldJobId, amSim);
                }
            }
        }
    }

    private void printSimulationInfo() {
        if (this.printSimulation) {
            LOG.info((Object)"------------------------------------");
            LOG.info((Object)MessageFormat.format("# nodes = {0}, # racks = {1}, capacity of each node {2} MB memory and {3} vcores.", this.numNMs, this.numRacks, this.nmMemoryMB, this.nmVCores));
            LOG.info((Object)"------------------------------------");
            LOG.info((Object)MessageFormat.format("# applications = {0}, # total tasks = {1}, average # tasks per application = {2}", this.numAMs, this.numTasks, (int)Math.ceil(((double)this.numTasks + 0.0) / (double)this.numAMs)));
            LOG.info((Object)"JobId\tQueue\tAMType\tDuration\t#Tasks");
            for (Map.Entry<String, AMSimulator> entry : this.amMap.entrySet()) {
                AMSimulator am = entry.getValue();
                LOG.info((Object)(entry.getKey() + "\t" + am.getQueue() + "\t" + am.getAMType() + "\t" + am.getDuration() + "\t" + am.getNumTasks()));
            }
            LOG.info((Object)"------------------------------------");
            LOG.info((Object)MessageFormat.format("number of queues = {0}  average number of apps = {1}", this.queueAppNumMap.size(), (int)Math.ceil(((double)this.numAMs + 0.0) / (double)this.queueAppNumMap.size())));
            LOG.info((Object)"------------------------------------");
            LOG.info((Object)MessageFormat.format("estimated simulation time is {0} seconds", (long)Math.ceil((double)this.maxRuntime / 1000.0)));
            LOG.info((Object)"------------------------------------");
        }
        simulateInfoMap.put("Number of racks", this.numRacks);
        simulateInfoMap.put("Number of nodes", this.numNMs);
        simulateInfoMap.put("Node memory (MB)", this.nmMemoryMB);
        simulateInfoMap.put("Node VCores", this.nmVCores);
        simulateInfoMap.put("Number of applications", this.numAMs);
        simulateInfoMap.put("Number of tasks", this.numTasks);
        simulateInfoMap.put("Average tasks per applicaion", (int)Math.ceil(((double)this.numTasks + 0.0) / (double)this.numAMs));
        simulateInfoMap.put("Number of queues", this.queueAppNumMap.size());
        simulateInfoMap.put("Average applications per queue", (int)Math.ceil(((double)this.numAMs + 0.0) / (double)this.queueAppNumMap.size()));
        simulateInfoMap.put("Estimated simulate time (s)", (long)Math.ceil((double)this.maxRuntime / 1000.0));
    }

    public HashMap<NodeId, NMSimulator> getNmMap() {
        return this.nmMap;
    }

    public static TaskRunner getRunner() {
        return runner;
    }

    public static void decreaseRemainingApps() {
        if (--remainingApps == 0) {
            LOG.info((Object)"SLSRunner tears down.");
            System.exit(0);
        }
    }

    public static void main(String[] args) throws Exception {
        File outputFile;
        Options options = new Options();
        options.addOption("inputrumen", true, "input rumen files");
        options.addOption("inputsls", true, "input sls files");
        options.addOption("nodes", true, "input topology");
        options.addOption("output", true, "output directory");
        options.addOption("trackjobs", true, "jobs to be tracked during simulating");
        options.addOption("printsimulation", false, "print out simulation information");
        GnuParser parser = new GnuParser();
        CommandLine cmd = parser.parse(options, args);
        String inputRumen = cmd.getOptionValue("inputrumen");
        String inputSLS = cmd.getOptionValue("inputsls");
        String output = cmd.getOptionValue("output");
        if (inputRumen == null && inputSLS == null || output == null) {
            System.err.println();
            System.err.println("ERROR: Missing input or output file");
            System.err.println();
            System.err.println("Options: -inputrumen|-inputsls FILE,FILE... -output FILE [-nodes FILE] [-trackjobs JobId,JobId...] [-printsimulation]");
            System.err.println();
            System.exit(1);
        }
        if (!(outputFile = new File(output)).exists() && !outputFile.mkdirs()) {
            System.err.println("ERROR: Cannot create output directory " + outputFile.getAbsolutePath());
            System.exit(1);
        }
        HashSet<String> trackedJobSet = new HashSet<String>();
        if (cmd.hasOption("trackjobs")) {
            String trackjobs = cmd.getOptionValue("trackjobs");
            String[] jobIds = trackjobs.split(",");
            trackedJobSet.addAll(Arrays.asList(jobIds));
        }
        String nodeFile = cmd.hasOption("nodes") ? cmd.getOptionValue("nodes") : "";
        boolean isSLS = inputSLS != null;
        String[] inputFiles = isSLS ? inputSLS.split(",") : inputRumen.split(",");
        SLSRunner sls = new SLSRunner(isSLS, inputFiles, nodeFile, output, trackedJobSet, cmd.hasOption("printsimulation"));
        sls.start();
    }
}

