/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.task.impl.cdfw;

import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.Message;
import edu.iu.dsc.tws.api.comms.Communicator;
import edu.iu.dsc.tws.api.comms.channel.TWSChannel;
import edu.iu.dsc.tws.api.compute.executor.ExecutionPlan;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.exceptions.TimeoutException;
import edu.iu.dsc.tws.api.resource.IAllJoinedListener;
import edu.iu.dsc.tws.api.resource.IReceiverFromDriver;
import edu.iu.dsc.tws.api.resource.IScalerListener;
import edu.iu.dsc.tws.api.resource.IWorkerController;
import edu.iu.dsc.tws.api.resource.Network;
import edu.iu.dsc.tws.api.util.KryoSerializer;
import edu.iu.dsc.tws.master.worker.JMSenderToDriver;
import edu.iu.dsc.tws.master.worker.JMWorkerAgent;
import edu.iu.dsc.tws.proto.jobmaster.JobMasterAPI;
import edu.iu.dsc.tws.proto.system.job.CDFWJobAPI;
import edu.iu.dsc.tws.task.impl.TaskExecutor;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;

public class CDFWRuntime
implements IReceiverFromDriver,
IScalerListener,
IAllJoinedListener {
    private static final Logger LOG = Logger.getLogger(CDFWRuntime.class.getName());
    private BlockingQueue<Any> executeMessageQueue;
    private int workerId;
    private KryoSerializer serializer;
    private TaskExecutor taskExecutor;
    private Config config;
    private IWorkerController controller;
    private Communicator communicator;
    private TWSChannel channel;
    private AtomicBoolean scaleUpRequest = new AtomicBoolean(false);
    private AtomicBoolean scaleDownRequest = new AtomicBoolean(false);

    public CDFWRuntime(Config cfg, int wId, IWorkerController controller) {
        this.executeMessageQueue = new LinkedBlockingQueue<Any>();
        this.workerId = wId;
        this.serializer = new KryoSerializer();
        this.controller = controller;
        this.config = cfg;
        List<JobMasterAPI.WorkerInfo> workerInfoList = this.initSynch(controller);
        if (workerInfoList == null) {
            return;
        }
        this.channel = Network.initializeChannel((Config)this.config, (IWorkerController)controller);
        String persistent = null;
        this.communicator = new Communicator(this.config, this.channel, persistent);
        this.taskExecutor = new TaskExecutor(cfg, wId, workerInfoList, this.communicator, null);
    }

    private List<JobMasterAPI.WorkerInfo> initSynch(IWorkerController workerController) {
        List workerList = null;
        try {
            workerList = workerController.getAllWorkers();
        }
        catch (TimeoutException timeoutException) {
            LOG.log(Level.SEVERE, timeoutException.getMessage(), timeoutException);
            return null;
        }
        if (workerList == null) {
            LOG.severe("Can not get all workers to join. Something wrong. Exiting ....................");
            return null;
        }
        LOG.info(workerList.size() + " workers joined. ");
        LOG.fine("Waiting on a barrier ........................ ");
        try {
            workerController.waitOnBarrier();
        }
        catch (TimeoutException e) {
            LOG.log(Level.SEVERE, e.getMessage(), e);
            return null;
        }
        LOG.fine("Proceeded through the barrier ........................ ");
        return workerList;
    }

    public boolean execute() {
        while (true) {
            Any msg;
            if ((msg = (Any)this.executeMessageQueue.peek()) == null) {
                if (this.scaleUpRequest.get()) {
                    this.communicator.close();
                    List<JobMasterAPI.WorkerInfo> workerInfoList = this.initSynch(this.controller);
                    LOG.info("Existing workers calling barrier");
                    this.channel = Network.initializeChannel((Config)this.config, (IWorkerController)this.controller);
                    String persistent = null;
                    this.communicator = new Communicator(this.config, this.channel, persistent);
                    this.taskExecutor = new TaskExecutor(this.config, this.workerId, workerInfoList, this.communicator, null);
                }
                this.scaleUpRequest.set(false);
                continue;
            }
            msg = (Any)this.executeMessageQueue.poll();
            if (msg.is(CDFWJobAPI.ExecuteMessage.class)) {
                if (!this.handleExecuteMessage(msg)) continue;
                return false;
            }
            if (msg.is(CDFWJobAPI.CDFWJobCompletedMessage.class)) break;
        }
        LOG.log(Level.INFO, this.workerId + "Received CDFW job completed message. Leaving execution loop");
        LOG.log(Level.INFO, this.workerId + " Execution Completed");
        return true;
    }

    private boolean handleExecuteMessage(Any msg) {
        JMSenderToDriver senderToDriver = JMWorkerAgent.getJMWorkerAgent().getSenderToDriver();
        CDFWJobAPI.ExecuteCompletedMessage completedMessage = null;
        try {
            CDFWJobAPI.ExecuteMessage executeMessage = (CDFWJobAPI.ExecuteMessage)msg.unpack(CDFWJobAPI.ExecuteMessage.class);
            CDFWJobAPI.SubGraph subGraph = executeMessage.getGraph();
            ComputeGraph taskGraph = (ComputeGraph)this.serializer.deserialize(subGraph.getGraphSerialized().toByteArray());
            if (taskGraph == null) {
                LOG.severe(this.workerId + " Unable to find the subgraph " + subGraph.getName());
                return true;
            }
            ExecutionPlan executionPlan = this.taskExecutor.plan(taskGraph);
            this.taskExecutor.execute(taskGraph, executionPlan);
            completedMessage = CDFWJobAPI.ExecuteCompletedMessage.newBuilder().setSubgraphName(subGraph.getName()).build();
            if (!senderToDriver.sendToDriver((Message)completedMessage)) {
                LOG.severe("Unable to send the subgraph completed message :" + completedMessage);
            }
        }
        catch (InvalidProtocolBufferException e) {
            LOG.log(Level.SEVERE, "Unable to unpack received message ", e);
        }
        return false;
    }

    public void driverMessageReceived(Any anyMessage) {
        try {
            this.executeMessageQueue.put(anyMessage);
        }
        catch (InterruptedException e) {
            LOG.log(Level.SEVERE, "Unable to insert message to the queue", e);
        }
    }

    public void workersScaledUp(int instancesAdded) {
        LOG.log(Level.INFO, this.workerId + "Workers scaled up msg received. Instances added: " + instancesAdded);
        this.scaleUpRequest.set(true);
    }

    public void workersScaledDown(int instancesRemoved) {
        LOG.log(Level.FINE, this.workerId + "Workers scaled down msg received. Instances removed: " + instancesRemoved);
        this.scaleDownRequest.set(true);
    }

    public void allWorkersJoined(List<JobMasterAPI.WorkerInfo> workerList) {
        LOG.log(Level.FINE, this.workerId + "All workers joined msg received");
    }

    private boolean reinitialize() {
        this.communicator.close();
        List workerInfoList = null;
        try {
            workerInfoList = this.controller.getAllWorkers();
        }
        catch (TimeoutException timeoutException) {
            LOG.log(Level.SEVERE, timeoutException.getMessage(), timeoutException);
        }
        this.channel = Network.initializeChannel((Config)this.config, (IWorkerController)this.controller);
        String persistent = null;
        this.communicator = new Communicator(this.config, this.channel, persistent);
        this.taskExecutor = new TaskExecutor(this.config, this.workerId, workerInfoList, this.communicator, null);
        return true;
    }
}

