/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.rsched.schedulers.mesos;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.resource.IPersistentVolume;
import edu.iu.dsc.tws.api.resource.IVolatileVolume;
import edu.iu.dsc.tws.api.resource.IWorker;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.common.config.ConfigLoader;
import edu.iu.dsc.tws.common.util.ReflectionUtils;
import edu.iu.dsc.tws.common.zk.ZKJobMasterFinder;
import edu.iu.dsc.tws.master.worker.JMWorkerAgent;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.JobAPI;
import edu.iu.dsc.tws.rsched.schedulers.mesos.MesosWorkerController;
import edu.iu.dsc.tws.rsched.schedulers.mesos.MesosWorkerLogger;
import edu.iu.dsc.tws.rsched.schedulers.mesos.MesosWorkerUtils;
import edu.iu.dsc.tws.rsched.utils.JobUtils;
import java.net.Inet4Address;
import java.nio.file.Paths;
import java.util.Map;
import java.util.logging.Logger;

public class MesosDockerWorker {
    public static final Logger LOG = Logger.getLogger(MesosDockerWorker.class.getName());
    public static JMWorkerAgent jobMasterAgent;
    private static Config config;
    private static String jobID;
    private static int startingPort;
    private static int resourceIndex;
    private static int workerId;

    public static void main(String[] args) throws Exception {
        workerId = Integer.parseInt(System.getenv("WORKER_ID"));
        jobID = System.getenv("JOB_NAME");
        MesosDockerWorker worker = new MesosDockerWorker();
        String twister2Home = Paths.get("", new String[0]).toAbsolutePath().toString();
        String configDir = "twister2-job";
        config = ConfigLoader.loadConfig((String)twister2Home, (String)configDir, (String)"mesos");
        resourceIndex = Integer.parseInt(System.getenv("COMPUTE_RESOURCE_INDEX"));
        MesosWorkerLogger logger = new MesosWorkerLogger(config, "/persistent-volume/logs", "worker" + workerId);
        logger.initLogging();
        LOG.info("WORKER ID ..:" + workerId);
        Map<String, Integer> additionalPorts = MesosWorkerUtils.generateAdditionalPorts(config, startingPort);
        MesosWorkerController workerController = null;
        JobAPI.Job job = JobUtils.readJobFile(null, "twister2-job/" + jobID + ".job");
        try {
            JobAPI.ComputeResource computeResource = JobUtils.getComputeResource(job, resourceIndex);
            workerController = new MesosWorkerController(config, job, Inet4Address.getLocalHost().getHostAddress(), 2023, workerId, computeResource, additionalPorts);
        }
        catch (Exception e) {
            LOG.severe("Error " + e.getMessage());
        }
        ZKJobMasterFinder finder = new ZKJobMasterFinder(config, job.getJobId());
        finder.initialize();
        String jobMasterIPandPort = finder.getJobMasterIPandPort();
        if (jobMasterIPandPort == null) {
            LOG.info("Job Master has not joined yet. Will wait and try to get the address ...");
            jobMasterIPandPort = finder.waitAndGetJobMasterIPandPort(20000L);
            LOG.info("Job Master address: " + jobMasterIPandPort);
        } else {
            LOG.info("Job Master address: " + jobMasterIPandPort);
        }
        finder.close();
        String jobMasterPortStr = jobMasterIPandPort.substring(jobMasterIPandPort.lastIndexOf(":") + 1);
        int jobMasterPort = Integer.parseInt(jobMasterPortStr);
        String jobMasterIP = jobMasterIPandPort.substring(0, jobMasterIPandPort.lastIndexOf(":"));
        int workerCount = job.getNumberOfWorkers();
        LOG.info("Worker Count..: " + workerCount);
        LOG.info(workerController.getWorkerInfo().toString());
        worker.startJobMasterAgent(workerController.getWorkerInfo(), jobMasterIP, jobMasterPort, workerCount);
        config = JobUtils.overrideConfigs(job, config);
        config = JobUtils.updateConfigs(job, config);
        MesosDockerWorker.startWorker(workerController, null);
        try {
            Thread.sleep(3000L);
        }
        catch (InterruptedException e) {
            LOG.info("sleep exception" + e.getMessage());
        }
        MesosDockerWorker.closeWorker();
    }

    public static void startWorker(IWorkerController workerController, IPersistentVolume pv) {
        IWorker worker;
        JobAPI.Job job = JobUtils.readJobFile(null, "twister2-job/" + jobID + ".job");
        String workerClass = job.getWorkerClassName();
        LOG.info("Worker class---->>>" + workerClass);
        try {
            Object object = ReflectionUtils.newInstance((String)workerClass);
            worker = (IWorker)object;
            LOG.info("Loaded worker class..: " + workerClass);
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            LOG.severe(String.format("Failed to load the worker class %s", workerClass));
            throw new RuntimeException(e);
        }
        IVolatileVolume volatileVolume = null;
        worker.execute(config, workerId, (IWorkerController)jobMasterAgent.getJMWorkerController(), pv, volatileVolume);
    }

    public static void closeWorker() {
        jobMasterAgent.sendWorkerCompletedMessage();
        jobMasterAgent.close();
    }

    public void startJobMasterAgent(JobMasterAPI.WorkerInfo workerInfo, String jobMasterIP, int jobMasterPort, int numberOfWorkers) {
        LOG.info("JobMaster IP..: " + jobMasterIP);
        LOG.info("NETWORK INFO..: " + workerInfo.getWorkerIP());
        JobMasterAPI.WorkerState initialState = JobMasterAPI.WorkerState.STARTED;
        jobMasterAgent = JMWorkerAgent.createJMWorkerAgent((Config)config, (JobMasterAPI.WorkerInfo)workerInfo, (String)jobMasterIP, (int)jobMasterPort, (int)numberOfWorkers, (JobMasterAPI.WorkerState)initialState);
        jobMasterAgent.startThreaded();
    }

    static {
        startingPort = 30000;
        resourceIndex = 0;
        workerId = 0;
    }
}

