/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.task.cdfw;

import com.google.protobuf.Any;
import com.google.protobuf.Message;
import edu.iu.dsc.tws.api.driver.IDriverMessenger;
import edu.iu.dsc.tws.api.exceptions.Twister2RuntimeException;
import edu.iu.dsc.tws.proto.system.job.CDFWJobAPI;
import edu.iu.dsc.tws.task.cdfw.CDFWEnv;
import edu.iu.dsc.tws.task.cdfw.CDFWScheduler;
import edu.iu.dsc.tws.task.cdfw.DataFlowGraph;
import edu.iu.dsc.tws.task.cdfw.DriveEventType;
import edu.iu.dsc.tws.task.cdfw.DriverEvent;
import edu.iu.dsc.tws.task.cdfw.DriverState;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

public final class CDFWExecutor {
    private static final Logger LOG = Logger.getLogger(CDFWExecutor.class.getName());
    private BlockingQueue<DriverEvent> driverEvents = new LinkedBlockingDeque<DriverEvent>();
    private DriverState driverState = DriverState.INITIALIZE;
    private IDriverMessenger driverMessenger;
    private CDFWEnv executionEnv;

    public CDFWExecutor(CDFWEnv executionEnv, IDriverMessenger messenger) {
        this.driverMessenger = messenger;
        this.executionEnv = executionEnv;
    }

    public void execute(DataFlowGraph graph) {
        LOG.fine("Starting task graph Requirements:" + graph.getGraphName());
        if (this.driverState != DriverState.JOB_FINISHED && this.driverState != DriverState.INITIALIZE) {
            throw new RuntimeException("Invalid state to execute a job: " + (Object)((Object)this.driverState));
        }
        CDFWScheduler cdfwScheduler = new CDFWScheduler(this.executionEnv.getWorkerInfoList());
        Set<Integer> workerIDs = cdfwScheduler.schedule(graph);
        this.submitGraph(graph, workerIDs);
    }

    public void executeCDFW(DataFlowGraph ... graph) {
        if (this.driverState != DriverState.JOB_FINISHED && this.driverState != DriverState.INITIALIZE) {
            throw new RuntimeException("Invalid state to execute a job: " + (Object)((Object)this.driverState));
        }
        CDFWScheduler cdfwScheduler = new CDFWScheduler(this.executionEnv.getWorkerInfoList());
        Map<DataFlowGraph, Set<Integer>> scheduleGraphMap = cdfwScheduler.schedule(graph);
        ScheduledExecutorService executor = Executors.newScheduledThreadPool(scheduleGraphMap.size());
        for (Map.Entry<DataFlowGraph, Set<Integer>> entry : scheduleGraphMap.entrySet()) {
            CDFWExecutorTask cdfwSchedulerTask = new CDFWExecutorTask(entry.getKey(), entry.getValue());
            executor.submit(cdfwSchedulerTask);
        }
        try {
            executor.awaitTermination(1L, TimeUnit.SECONDS);
        }
        catch (InterruptedException e) {
            throw new Twister2RuntimeException((Throwable)e);
        }
        finally {
            executor.shutdown();
        }
    }

    void close() {
        this.sendCloseMessage();
    }

    private void submitGraph(DataFlowGraph dataFlowgraph, Set<Integer> workerIDs) {
        if (this.driverState == DriverState.INITIALIZE || this.driverState == DriverState.JOB_FINISHED) {
            try {
                DataFlowGraph dataFlowGraph = this.buildCDFWSchedulePlan(dataFlowgraph, workerIDs);
                CDFWJobAPI.SubGraph job = this.buildCDFWJob(dataFlowGraph);
                this.submitJob(job);
                this.driverState = DriverState.JOB_SUBMITTED;
                this.waitForEvent(DriveEventType.FINISHED_JOB);
                this.driverState = DriverState.JOB_FINISHED;
            }
            catch (Exception e) {
                throw new Twister2RuntimeException("Driver is not initialized", (Throwable)e);
            }
        } else {
            throw new Twister2RuntimeException("Failed to submit job in this state: " + (Object)((Object)this.driverState));
        }
    }

    private DataFlowGraph buildCDFWSchedulePlan(DataFlowGraph dataFlowGraph, Set<Integer> workerIDs) {
        dataFlowGraph.setCdfwSchedulePlans(CDFWJobAPI.CDFWSchedulePlan.newBuilder().addAllWorkers(workerIDs).build());
        return dataFlowGraph;
    }

    private void sendCloseMessage() {
        CDFWJobAPI.CDFWJobCompletedMessage.Builder builder = CDFWJobAPI.CDFWJobCompletedMessage.newBuilder().setHtgJobname("");
        this.driverMessenger.broadcastToAllWorkers((Message)builder.build());
    }

    private void submitJob(CDFWJobAPI.SubGraph job) {
        LOG.log(Level.INFO, "Sending graph to workers for execution: " + job.getName());
        CDFWJobAPI.ExecuteMessage.Builder builder = CDFWJobAPI.ExecuteMessage.newBuilder();
        builder.setSubgraphName(job.getName());
        builder.setGraph(job);
        this.driverMessenger.broadcastToAllWorkers((Message)builder.build());
    }

    private CDFWJobAPI.SubGraph buildCDFWJob(DataFlowGraph job) {
        return job.build();
    }

    void workerMessageReceived(Any anyMessage, int senderWorkerID) {
        LOG.log(Level.FINE, String.format("Received worker message %d: %s", senderWorkerID, anyMessage.getClass().getName()));
        this.driverEvents.offer(new DriverEvent(DriveEventType.FINISHED_JOB, anyMessage, senderWorkerID));
    }

    private DriverEvent waitForEvent(DriveEventType type) throws Exception {
        try {
            DriverEvent event = this.driverEvents.take();
            if (event.getType() != type) {
                throw new Exception("Un-expected event: " + (Object)((Object)type));
            }
            return event;
        }
        catch (InterruptedException e) {
            throw new Twister2RuntimeException("Failed to take event", (Throwable)e);
        }
    }

    private class CDFWExecutorTask
    implements Runnable {
        private DataFlowGraph dataFlowGraph;
        private Set<Integer> workerIDs;

        CDFWExecutorTask(DataFlowGraph graph, Set<Integer> workerList) {
            this.dataFlowGraph = graph;
            this.workerIDs = workerList;
        }

        @Override
        public void run() {
            CDFWExecutor.this.submitGraph(this.dataFlowGraph, this.workerIDs);
        }
    }
}

