/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.tsched.streaming.datalocalityaware;

import edu.iu.dsc.tws.api.compute.exceptions.TaskSchedulerException;
import edu.iu.dsc.tws.api.compute.graph.ComputeGraph;
import edu.iu.dsc.tws.api.compute.graph.Vertex;
import edu.iu.dsc.tws.api.compute.schedule.ITaskScheduler;
import edu.iu.dsc.tws.api.compute.schedule.elements.Resource;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskInstanceId;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskInstancePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.TaskSchedulePlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.Worker;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerPlan;
import edu.iu.dsc.tws.api.compute.schedule.elements.WorkerSchedulePlan;
import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.data.FileStatus;
import edu.iu.dsc.tws.api.data.FileSystem;
import edu.iu.dsc.tws.api.data.Path;
import edu.iu.dsc.tws.data.utils.DataNodeLocatorUtils;
import edu.iu.dsc.tws.data.utils.FileSystemUtils;
import edu.iu.dsc.tws.tsched.spi.common.TaskSchedulerContext;
import edu.iu.dsc.tws.tsched.spi.taskschedule.TaskInstanceMapCalculation;
import edu.iu.dsc.tws.tsched.utils.DataTransferTimeCalculator;
import edu.iu.dsc.tws.tsched.utils.TaskAttributes;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.logging.Logger;

public class DataLocalityStreamingTaskScheduler
implements ITaskScheduler {
    private static final Logger LOG = Logger.getLogger(DataLocalityStreamingTaskScheduler.class.getName());
    private static int globalTaskIndex = 0;
    private Double instanceRAM;
    private Double instanceDisk;
    private Double instanceCPU;
    private Config config;
    private static List<Integer> allocatedWorkers = new ArrayList<Integer>();
    private int workerId = 0;

    public void initialize(Config cfg) {
        this.config = cfg;
        this.instanceRAM = TaskSchedulerContext.taskInstanceRam(this.config);
        this.instanceDisk = TaskSchedulerContext.taskInstanceDisk(this.config);
        this.instanceCPU = TaskSchedulerContext.taskInstanceCpu(this.config);
    }

    public void initialize(Config cfg, int workerid) {
        this.initialize(cfg);
        this.workerId = workerid;
    }

    public TaskSchedulePlan schedule(ComputeGraph graph, WorkerPlan workerPlan) {
        int taskSchedulePlanId = 0;
        HashSet<WorkerSchedulePlan> workerSchedulePlans = new HashSet<WorkerSchedulePlan>();
        Set taskVertexSet = graph.getTaskVertexSet();
        Map<Integer, List<TaskInstanceId>> containerInstanceMap = this.dataLocalityStreamingSchedulingAlgorithm(graph, workerPlan.getNumberOfWorkers(), workerPlan);
        TaskInstanceMapCalculation instanceMapCalculation = new TaskInstanceMapCalculation(this.instanceRAM, this.instanceCPU, this.instanceDisk);
        Map<Integer, Map<TaskInstanceId, Double>> instancesRamMap = instanceMapCalculation.getInstancesRamMapInContainer(containerInstanceMap, taskVertexSet);
        Map<Integer, Map<TaskInstanceId, Double>> instancesDiskMap = instanceMapCalculation.getInstancesDiskMapInContainer(containerInstanceMap, taskVertexSet);
        Map<Integer, Map<TaskInstanceId, Double>> instancesCPUMap = instanceMapCalculation.getInstancesCPUMapInContainer(containerInstanceMap, taskVertexSet);
        for (int containerId : containerInstanceMap.keySet()) {
            double containerRAMValue = TaskSchedulerContext.containerRamPadding(this.config);
            double containerDiskValue = TaskSchedulerContext.containerDiskPadding(this.config);
            double containerCpuValue = TaskSchedulerContext.containerCpuPadding(this.config);
            List<TaskInstanceId> taskTaskInstanceIds = containerInstanceMap.get(containerId);
            HashMap<TaskInstanceId, TaskInstancePlan> taskInstancePlanMap = new HashMap<TaskInstanceId, TaskInstancePlan>();
            for (TaskInstanceId id : taskTaskInstanceIds) {
                double instanceRAMValue = instancesRamMap.get(containerId).get(id);
                double instanceDiskValue = instancesDiskMap.get(containerId).get(id);
                double instanceCPUValue = instancesCPUMap.get(containerId).get(id);
                Resource instanceResource = new Resource(Double.valueOf(instanceRAMValue), Double.valueOf(instanceDiskValue), Double.valueOf(instanceCPUValue));
                taskInstancePlanMap.put(id, new TaskInstancePlan(id.getTaskName(), id.getTaskId(), id.getTaskIndex(), instanceResource));
                containerRAMValue += instanceRAMValue;
                containerDiskValue += instanceDiskValue;
                containerCpuValue += instanceDiskValue;
            }
            Worker worker = workerPlan.getWorker(containerId);
            Resource containerResource = worker != null && worker.getCpu() > 0 && worker.getDisk() > 0 && worker.getRam() > 0 ? new Resource(Double.valueOf(worker.getRam()), Double.valueOf(worker.getDisk()), Double.valueOf(worker.getCpu())) : new Resource(Double.valueOf(containerRAMValue), Double.valueOf(containerDiskValue), Double.valueOf(containerCpuValue));
            WorkerSchedulePlan taskWorkerSchedulePlan = new WorkerSchedulePlan(containerId, new HashSet(taskInstancePlanMap.values()), containerResource);
            workerSchedulePlans.add(taskWorkerSchedulePlan);
        }
        return new TaskSchedulePlan(taskSchedulePlanId, workerSchedulePlans);
    }

    private Map<Integer, List<TaskInstanceId>> dataLocalityStreamingSchedulingAlgorithm(ComputeGraph graph, int numberOfContainers, WorkerPlan workerPlan) {
        TaskAttributes taskAttributes = new TaskAttributes();
        Set taskVertexSet = graph.getTaskVertexSet();
        int instancesPerContainer = !graph.getGraphConstraints().isEmpty() ? taskAttributes.getInstancesPerWorker(graph.getGraphConstraints()) : TaskSchedulerContext.defaultTaskInstancesPerContainer(this.config);
        int containerCapacity = instancesPerContainer * numberOfContainers;
        int localIndex = 0;
        int containerIndex = 0;
        int totalInstances = !graph.getNodeConstraints().isEmpty() ? taskAttributes.getTotalNumberOfInstances(taskVertexSet, (Map<String, Map<String, String>>)graph.getNodeConstraints()) : taskAttributes.getTotalNumberOfInstances(taskVertexSet);
        HashMap<Integer, List<TaskInstanceId>> dataAwareAllocationMap = new HashMap<Integer, List<TaskInstanceId>>();
        if (containerCapacity >= totalInstances) {
            LOG.info("Task scheduling could be performed for the container capacity of " + containerCapacity + " and " + totalInstances + " task instances");
            for (int i = 0; i < numberOfContainers; ++i) {
                dataAwareAllocationMap.put(i, new ArrayList());
            }
        } else {
            throw new TaskSchedulerException("Task scheduling couldn't be performed for the container capacity of " + containerCapacity + " and " + totalInstances + " task instances");
        }
        TreeSet<Vertex> orderedTaskSet = new TreeSet<Vertex>(new VertexComparator());
        orderedTaskSet.addAll(taskVertexSet);
        Map<String, Integer> parallelTaskMap = !graph.getNodeConstraints().isEmpty() ? taskAttributes.getParallelTaskMap(taskVertexSet, (Map<String, Map<String, String>>)graph.getNodeConstraints()) : taskAttributes.getParallelTaskMap(taskVertexSet);
        for (Map.Entry<String, Integer> aTaskEntrySet : parallelTaskMap.entrySet()) {
            for (Vertex vertex : taskVertexSet) {
                if (!aTaskEntrySet.getKey().equals(vertex.getName())) continue;
                int totalTaskInstances = vertex.getParallelism();
                int maxContainerTaskObjectSize = 0;
                List<DataTransferTimeCalculator> calList = this.dTTimecalculatorList(localIndex, workerPlan, dataAwareAllocationMap, containerIndex, instancesPerContainer);
                for (int i = 0; i < totalTaskInstances; ++i) {
                    containerIndex = Integer.parseInt(Collections.min(calList).getNodeName().trim());
                    if (maxContainerTaskObjectSize < instancesPerContainer) {
                        ((List)dataAwareAllocationMap.get(containerIndex)).add(new TaskInstanceId(vertex.getName(), globalTaskIndex, i));
                        ++maxContainerTaskObjectSize;
                        continue;
                    }
                    throw new TaskSchedulerException("Task Scheduling couldn't be possible for the present configuration, please check the number of workers, maximum instances per worker");
                }
                ++globalTaskIndex;
                ++localIndex;
            }
        }
        return dataAwareAllocationMap;
    }

    private List<DataTransferTimeCalculator> dTTimecalculatorList(int index, WorkerPlan workerPlan, Map<Integer, List<TaskInstanceId>> map, int containerIndex, int maxTaskPerContainer) {
        List<String> inputDataList = this.getInputFilesList();
        List<DataTransferTimeCalculator> dataTransferTimeCalculatorList = null;
        DataNodeLocatorUtils dataNodeLocatorUtils = new DataNodeLocatorUtils(this.config);
        if (inputDataList.size() > 0) {
            if (index == 0) {
                List datanodesList = dataNodeLocatorUtils.findDataNodesLocation(inputDataList);
                Map<String, List<DataTransferTimeCalculator>> workerDatanodeDistanceMap = this.distanceCalculator(datanodesList, workerPlan, index, allocatedWorkers);
                dataTransferTimeCalculatorList = DataLocalityStreamingTaskScheduler.findBestWorkerNode(workerDatanodeDistanceMap);
            } else {
                List datanodesList = dataNodeLocatorUtils.findDataNodesLocation(inputDataList);
                Worker worker = workerPlan.getWorker(containerIndex);
                if (map.get(containerIndex).size() >= maxTaskPerContainer) {
                    allocatedWorkers.add(worker.getId());
                }
                Map<String, List<DataTransferTimeCalculator>> workerDatanodeDistanceMap = this.distanceCalculator(datanodesList, workerPlan, index, allocatedWorkers);
                dataTransferTimeCalculatorList = DataLocalityStreamingTaskScheduler.findBestWorkerNode(workerDatanodeDistanceMap);
            }
        }
        return dataTransferTimeCalculatorList;
    }

    private List<String> getInputFilesList() {
        ArrayList<String> inputDataList = new ArrayList<String>();
        String directory = null;
        if (this.config.get("dinput") != null) {
            directory = String.valueOf(this.config.get("dinput"));
        }
        Path path = new Path(directory);
        try {
            FileSystem fileSystem = FileSystemUtils.get((Path)path);
            if (this.config.get("filesys").equals("hdfs")) {
                FileStatus pathFile = fileSystem.getFileStatus(path);
                inputDataList.add(String.valueOf(pathFile.getPath()));
            } else if (this.config.get("filesys").equals("local")) {
                for (FileStatus file : fileSystem.listFiles(path)) {
                    String filename = String.valueOf(file.getPath());
                    if (filename == null) continue;
                    inputDataList.add(filename);
                }
            }
        }
        catch (IOException e) {
            throw new TaskSchedulerException("Not able to get the input files", (Throwable)e);
        }
        return inputDataList;
    }

    private Map<String, List<DataTransferTimeCalculator>> distanceCalculator(List<String> datanodesList, WorkerPlan workers, int index, List<Integer> assignedWorkers) {
        HashMap<String, List<DataTransferTimeCalculator>> workerDistanceMap = new HashMap<String, List<DataTransferTimeCalculator>>();
        double calculateDistance = 0.0;
        for (String nodesList : datanodesList) {
            GetDistanceCalculation getAllocation = new GetDistanceCalculation(workers, index, calculateDistance, nodesList, assignedWorkers).invoke();
            calculateDistance = getAllocation.getCalculateDistance();
            ArrayList calculatedVal = getAllocation.getCalculatedVal();
            workerDistanceMap.put(nodesList, calculatedVal);
        }
        return workerDistanceMap;
    }

    private static List<DataTransferTimeCalculator> findBestWorkerNode(Map<String, List<DataTransferTimeCalculator>> workerPlanMap) {
        ArrayList<DataTransferTimeCalculator> cal = new ArrayList<DataTransferTimeCalculator>();
        for (Map.Entry<String, List<DataTransferTimeCalculator>> entry : workerPlanMap.entrySet()) {
            String key = entry.getKey();
            List<DataTransferTimeCalculator> value = entry.getValue();
            cal.add(new DataTransferTimeCalculator(Collections.min(value).getNodeName(), Collections.min(value).getRequiredDataTransferTime(), key));
        }
        return cal;
    }

    private class GetDistanceCalculation {
        private WorkerPlan workers;
        private int index;
        private double calculateDistance;
        private String nodesList;
        private ArrayList<DataTransferTimeCalculator> calculatedVal;
        private List<Integer> assignedWorkers;

        GetDistanceCalculation(WorkerPlan workers, int index, double calculateDistance, String nodesList, List<Integer> allocatedWorkers) {
            this.workers = workers;
            this.index = index;
            this.calculateDistance = calculateDistance;
            this.nodesList = nodesList;
            this.assignedWorkers = allocatedWorkers;
        }

        private double getCalculateDistance() {
            return this.calculateDistance;
        }

        private ArrayList<DataTransferTimeCalculator> getCalculatedVal() {
            return this.calculatedVal;
        }

        private GetDistanceCalculation invoke() {
            this.calculatedVal = new ArrayList();
            for (int i = 0; i < this.workers.getNumberOfWorkers(); ++i) {
                DataTransferTimeCalculator calculateDataTransferTime;
                double workerLatency;
                double workerBandwidth;
                Worker worker = this.workers.getWorker(i);
                double datanodeBandwidth = TaskSchedulerContext.datanodeInstanceBandwidth(DataLocalityStreamingTaskScheduler.this.config);
                double datanodeLatency = TaskSchedulerContext.datanodeInstanceLatency(DataLocalityStreamingTaskScheduler.this.config);
                if (this.index == 0) {
                    if (worker.getProperty("bandwidth") != null && worker.getProperty("latency") != null) {
                        workerBandwidth = (Double)worker.getProperty("bandwidth");
                        workerLatency = (Double)worker.getProperty("latency");
                    } else {
                        workerBandwidth = TaskSchedulerContext.containerInstanceBandwidth(DataLocalityStreamingTaskScheduler.this.config);
                        workerLatency = TaskSchedulerContext.containerInstanceLatency(DataLocalityStreamingTaskScheduler.this.config);
                    }
                    calculateDataTransferTime = this.getDistance(worker, workerBandwidth, workerLatency, datanodeBandwidth, datanodeLatency);
                    this.calculatedVal.add(calculateDataTransferTime);
                    continue;
                }
                if (this.assignedWorkers.contains(worker.getId())) continue;
                if (worker.getProperty("bandwidth") != null && worker.getProperty("latency") != null) {
                    workerBandwidth = (Double)worker.getProperty("bandwidth");
                    workerLatency = (Double)worker.getProperty("latency");
                } else {
                    workerBandwidth = TaskSchedulerContext.containerInstanceBandwidth(DataLocalityStreamingTaskScheduler.this.config);
                    workerLatency = TaskSchedulerContext.containerInstanceLatency(DataLocalityStreamingTaskScheduler.this.config);
                }
                calculateDataTransferTime = this.getDistance(worker, workerBandwidth, workerLatency, datanodeBandwidth, datanodeLatency);
                this.calculatedVal.add(calculateDataTransferTime);
            }
            return this;
        }

        private DataTransferTimeCalculator getDistance(Worker worker, double workerBandwidth, double workerLatency, double datanodeBandwidth, double datanodeLatency) {
            DataTransferTimeCalculator calculateDataTransferTime = new DataTransferTimeCalculator(this.nodesList, this.calculateDistance);
            this.calculateDistance = Math.abs(2.0 * workerBandwidth * workerLatency - 2.0 * datanodeBandwidth * datanodeLatency);
            calculateDataTransferTime.setRequiredDataTransferTime(this.calculateDistance);
            calculateDataTransferTime.setNodeName(worker.getId() + "");
            calculateDataTransferTime.setTaskIndex(this.index);
            return calculateDataTransferTime;
        }
    }

    private static class VertexComparator
    implements Comparator<Vertex> {
        private VertexComparator() {
        }

        @Override
        public int compare(Vertex o1, Vertex o2) {
            return o1.getName().compareTo(o2.getName());
        }
    }
}

