/*
 * Decompiled with CFR 0.152.
 */
package alluxio.master.job.plan;

import alluxio.client.file.FileSystem;
import alluxio.collections.Pair;
import alluxio.exception.JobDoesNotExistException;
import alluxio.job.ErrorUtils;
import alluxio.job.JobConfig;
import alluxio.job.JobServerContext;
import alluxio.job.SelectExecutorsContext;
import alluxio.job.plan.BatchedJobConfig;
import alluxio.job.plan.PlanDefinition;
import alluxio.job.plan.PlanDefinitionRegistry;
import alluxio.job.plan.meta.PlanInfo;
import alluxio.job.wire.Status;
import alluxio.job.wire.TaskInfo;
import alluxio.master.job.command.CommandManager;
import alluxio.master.job.metrics.DistributedCmdMetrics;
import alluxio.retry.CountingRetry;
import alluxio.retry.RetryPolicy;
import alluxio.wire.WorkerInfo;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
public final class PlanCoordinator {
    private static final Logger LOG = LoggerFactory.getLogger(PlanCoordinator.class);
    private final PlanInfo mPlanInfo;
    private final CommandManager mCommandManager;
    private final JobServerContext mJobServerContext;
    private List<WorkerInfo> mWorkersInfoList;
    private final Map<Long, WorkerInfo> mTaskIdToWorkerInfo = Maps.newHashMap();
    private final Map<Long, List<Long>> mWorkerIdToTaskIds = Maps.newHashMap();

    private PlanCoordinator(CommandManager commandManager, JobServerContext jobServerContext, List<WorkerInfo> workerInfoList, Long jobId, JobConfig jobConfig, Consumer<PlanInfo> statusChangeCallback) {
        Preconditions.checkNotNull((Object)jobConfig);
        this.mJobServerContext = jobServerContext;
        this.mPlanInfo = new PlanInfo(jobId.longValue(), jobConfig, statusChangeCallback);
        this.mCommandManager = commandManager;
        this.mWorkersInfoList = workerInfoList;
    }

    public static PlanCoordinator create(CommandManager commandManager, JobServerContext jobServerContext, List<WorkerInfo> workerInfoList, Long jobId, JobConfig jobConfig, Consumer<PlanInfo> statusChangeCallback) throws JobDoesNotExistException {
        Preconditions.checkNotNull((Object)commandManager, (Object)"commandManager");
        PlanCoordinator planCoordinator = new PlanCoordinator(commandManager, jobServerContext, workerInfoList, jobId, jobConfig, statusChangeCallback);
        planCoordinator.start();
        return planCoordinator;
    }

    private synchronized void start() throws JobDoesNotExistException {
        Set<Pair<WorkerInfo, Serializable>> taskAddressToArgs;
        PlanDefinition<JobConfig, Serializable, Serializable> definition;
        LOG.info("Starting job Id={} Config={}", (Object)this.mPlanInfo.getId(), (Object)this.mPlanInfo.getJobConfig());
        try {
            definition = PlanDefinitionRegistry.INSTANCE.getJobDefinition(this.mPlanInfo.getJobConfig());
        }
        catch (JobDoesNotExistException e) {
            LOG.info("Exception when getting jobDefinition from jobConfig: ", (Throwable)e);
            this.mPlanInfo.setErrorType(ErrorUtils.getErrorType((Throwable)e));
            this.mPlanInfo.setErrorMessage(e.getMessage());
            DistributedCmdMetrics.incrementForAllConfigsFailStatus(this.mPlanInfo.getJobConfig());
            this.mPlanInfo.setStatus(Status.FAILED);
            throw e;
        }
        SelectExecutorsContext context = new SelectExecutorsContext(this.mPlanInfo.getId(), this.mJobServerContext);
        ArrayList workersInfoListCopy = Lists.newArrayList(this.mWorkersInfoList);
        Collections.shuffle(workersInfoListCopy);
        try {
            taskAddressToArgs = definition.selectExecutors(this.mPlanInfo.getJobConfig(), workersInfoListCopy, context);
        }
        catch (Exception e) {
            LOG.warn("Failed to select executor. {})", (Object)e.toString());
            LOG.info("Exception: ", (Throwable)e);
            this.setJobAsFailed(ErrorUtils.getErrorType((Throwable)e), e.getMessage());
            return;
        }
        if (taskAddressToArgs.isEmpty()) {
            LOG.warn("No executor was selected.");
            this.updateStatus();
        }
        for (Pair<WorkerInfo, Serializable> pair : taskAddressToArgs) {
            JobConfig config;
            LOG.debug("Selected executor {} with parameters {}.", pair.getFirst(), pair.getSecond());
            int taskId = this.mTaskIdToWorkerInfo.size();
            this.mPlanInfo.addTask((long)taskId, (WorkerInfo)pair.getFirst(), pair.getSecond());
            if (this.mPlanInfo.getJobConfig() instanceof BatchedJobConfig) {
                BatchedJobConfig planConfig = (BatchedJobConfig)this.mPlanInfo.getJobConfig();
                config = new BatchedJobConfig(planConfig.getJobType(), new HashSet());
            } else {
                config = this.mPlanInfo.getJobConfig();
            }
            this.mCommandManager.submitRunTaskCommand(this.mPlanInfo.getId(), taskId, config, pair.getSecond(), ((WorkerInfo)pair.getFirst()).getId());
            this.mTaskIdToWorkerInfo.put(Long.valueOf(taskId), (WorkerInfo)pair.getFirst());
            this.mWorkerIdToTaskIds.putIfAbsent(((WorkerInfo)pair.getFirst()).getId(), Lists.newArrayList());
            this.mWorkerIdToTaskIds.get(((WorkerInfo)pair.getFirst()).getId()).add(Long.valueOf(taskId));
        }
    }

    public synchronized void cancel() {
        Iterator iterator = this.mPlanInfo.getTaskIdList().iterator();
        while (iterator.hasNext()) {
            long taskId = (Long)iterator.next();
            this.mCommandManager.submitCancelTaskCommand(this.mPlanInfo.getId(), taskId, this.mTaskIdToWorkerInfo.get(taskId).getId());
        }
        this.mWorkersInfoList = null;
    }

    public synchronized void updateTasks(List<TaskInfo> taskInfoList) {
        for (TaskInfo taskInfo : taskInfoList) {
            this.mPlanInfo.setTaskInfo(taskInfo.getTaskId(), taskInfo);
        }
        this.updateStatus();
        if (this.isJobFinished()) {
            this.mWorkersInfoList = null;
        }
    }

    public synchronized boolean isJobFinished() {
        return this.mPlanInfo.getStatus().isFinished();
    }

    public long getJobId() {
        return this.mPlanInfo.getId();
    }

    public synchronized void setJobAsFailed(String errorType, String errorMessage) {
        if (!this.mPlanInfo.getStatus().isFinished()) {
            this.mPlanInfo.setErrorType(errorType);
            this.mPlanInfo.setErrorMessage(errorMessage);
            DistributedCmdMetrics.incrementForAllConfigsFailStatus(this.mPlanInfo.getJobConfig());
            this.mPlanInfo.setStatus(Status.FAILED);
        }
        this.mWorkersInfoList = null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void failTasksForWorker(long workerId) {
        PlanInfo planInfo = this.mPlanInfo;
        synchronized (planInfo) {
            if (this.mPlanInfo.getStatus().isFinished()) {
                return;
            }
            List<Long> taskIds = this.mWorkerIdToTaskIds.get(workerId);
            if (taskIds == null) {
                return;
            }
            boolean statusChanged = false;
            for (Long taskId : taskIds) {
                TaskInfo taskInfo = this.mPlanInfo.getTaskInfo(taskId.longValue());
                if (taskInfo == null || taskInfo.getStatus().isFinished()) continue;
                taskInfo.setStatus(Status.FAILED);
                taskInfo.setErrorType("JobWorkerLost");
                taskInfo.setErrorMessage(String.format("Job worker(%s) was lost before the task(%d) could complete", taskInfo.getWorkerHost(), taskId));
                statusChanged = true;
                break;
            }
            if (statusChanged) {
                this.updateStatus();
            }
        }
    }

    public synchronized alluxio.job.wire.PlanInfo getPlanInfoWire(boolean verbose) {
        return new alluxio.job.wire.PlanInfo(this.mPlanInfo, verbose);
    }

    public synchronized PlanInfo getPlanInfo() {
        return this.mPlanInfo;
    }

    private synchronized void updateStatus() {
        int completed = 0;
        List taskInfoList = this.mPlanInfo.getTaskInfoList();
        JobConfig config = this.mPlanInfo.getJobConfig();
        Preconditions.checkNotNull((Object)config);
        FileSystem fileSystem = this.mJobServerContext.getFileSystem();
        block9: for (TaskInfo info : taskInfoList) {
            Status status = info.getStatus();
            switch (status) {
                case FAILED: {
                    this.setJobAsFailed(info.getErrorType(), "Task execution failed: " + info.getErrorMessage());
                    return;
                }
                case CANCELED: {
                    if (this.mPlanInfo.getStatus() != Status.FAILED) {
                        this.mPlanInfo.setStatus(Status.CANCELED);
                        DistributedCmdMetrics.incrementForAllConfigsCancelStatus(config);
                    }
                    return;
                }
                case RUNNING: {
                    if (this.mPlanInfo.getStatus() == Status.FAILED || this.mPlanInfo.getStatus() == Status.CANCELED) continue block9;
                    this.mPlanInfo.setStatus(Status.RUNNING);
                    continue block9;
                }
                case COMPLETED: {
                    ++completed;
                    continue block9;
                }
                case CREATED: {
                    continue block9;
                }
            }
            throw new IllegalArgumentException("Unsupported status " + info.getStatus());
        }
        if (completed == taskInfoList.size()) {
            if (this.mPlanInfo.getStatus() == Status.COMPLETED) {
                return;
            }
            try {
                this.mPlanInfo.setResult(this.join(taskInfoList));
                this.mPlanInfo.setStatus(Status.COMPLETED);
                DistributedCmdMetrics.incrementForAllConfigsCompleteStatus(config, fileSystem, (RetryPolicy)new CountingRetry(5));
            }
            catch (Exception e) {
                LOG.warn("Job error when joining tasks Job Id={} Config={}", new Object[]{this.mPlanInfo.getId(), this.mPlanInfo.getJobConfig(), e});
                this.setJobAsFailed(ErrorUtils.getErrorType((Throwable)e), e.getMessage());
            }
        }
    }

    private String join(List<TaskInfo> taskInfoList) throws Exception {
        PlanDefinition<JobConfig, Serializable, Serializable> definition = PlanDefinitionRegistry.INSTANCE.getJobDefinition(this.mPlanInfo.getJobConfig());
        HashMap taskResults = Maps.newHashMap();
        for (TaskInfo taskInfo : taskInfoList) {
            taskResults.put(this.mTaskIdToWorkerInfo.get(taskInfo.getTaskId()), taskInfo.getResult());
        }
        return definition.join(this.mPlanInfo.getJobConfig(), taskResults);
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (!(o instanceof PlanCoordinator)) {
            return false;
        }
        PlanCoordinator other = (PlanCoordinator)o;
        return Objects.equal((Object)this.mPlanInfo, (Object)other.mPlanInfo);
    }

    public int hashCode() {
        return Objects.hashCode((Object[])new Object[]{this.mPlanInfo});
    }
}

