/*
 * Decompiled with CFR 0.152.
 */
package alluxio.cli.fs.command;

import alluxio.ClientContext;
import alluxio.cli.fs.command.AbstractFileSystemCommand;
import alluxio.cli.fs.command.job.JobAttempt;
import alluxio.cli.util.DistributedCommandUtil;
import alluxio.client.file.FileSystemContext;
import alluxio.client.job.JobMasterClient;
import alluxio.job.CmdConfig;
import alluxio.job.wire.Status;
import alluxio.util.CommonUtils;
import alluxio.worker.job.JobMasterClientContext;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.cli.Option;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;

public abstract class AbstractDistributedJobCommand
extends AbstractFileSystemCommand {
    protected static final int DEFAULT_ACTIVE_JOBS = 3000;
    private static final int DEFAULT_FAILURE_LIMIT = 20;
    protected static final Option ASYNC_OPTION = Option.builder().longOpt("async").required(false).hasArg(false).argName("async").desc("Use async to submit the command asynchronously and not wait for command to finish").build();
    protected List<JobAttempt> mSubmittedJobAttempts = Lists.newArrayList();
    protected int mActiveJobs;
    protected final JobMasterClient mClient;
    private int mFailedCount;
    private int mCompletedCount;
    private int mFailedCmdCount;
    private int mCompletedCmdCount;
    private Set<String> mFailedFiles;

    protected AbstractDistributedJobCommand(FileSystemContext fsContext) {
        super(fsContext);
        ClientContext clientContext = this.mFsContext.getClientContext();
        this.mClient = JobMasterClient.Factory.create((JobMasterClientContext)JobMasterClientContext.newBuilder((ClientContext)clientContext).build());
        this.mActiveJobs = 3000;
        this.mFailedCount = 0;
        this.mCompletedCount = 0;
        this.mFailedCmdCount = 0;
        this.mCompletedCmdCount = 0;
        this.mFailedFiles = new HashSet<String>();
    }

    protected void drain() {
        while (!this.mSubmittedJobAttempts.isEmpty()) {
            this.waitJob();
        }
    }

    protected Long submit(CmdConfig cmdConfig) {
        Long jobControlId = null;
        try {
            jobControlId = this.mClient.submit(cmdConfig);
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        return jobControlId;
    }

    protected void waitJob() {
        AtomicBoolean removed = new AtomicBoolean(false);
        while (true) {
            this.mSubmittedJobAttempts = this.mSubmittedJobAttempts.stream().filter(jobAttempt -> {
                Status check = jobAttempt.check();
                switch (check) {
                    case CREATED: 
                    case RUNNING: {
                        return true;
                    }
                    case CANCELED: 
                    case COMPLETED: {
                        this.mCompletedCount += jobAttempt.getSize();
                        removed.set(true);
                        return false;
                    }
                    case FAILED: {
                        Set<String> failedFiles = jobAttempt.getFailedFiles();
                        this.mCompletedCount += jobAttempt.getSize() - failedFiles.size();
                        this.mFailedCount += failedFiles.size();
                        this.mFailedFiles.addAll(failedFiles);
                        removed.set(true);
                        return false;
                    }
                }
                throw new IllegalStateException(String.format("Unexpected Status: %s", check));
            }).collect(Collectors.toList());
            if (removed.get()) {
                return;
            }
            CommonUtils.sleepMs((long)5L);
        }
    }

    public void waitForCmd(long jobControlId) {
        while (true) {
            block6: {
                try {
                    Status check = this.mClient.getCmdStatus(jobControlId);
                    if (check.equals((Object)Status.FAILED)) {
                        ++this.mFailedCmdCount;
                        break;
                    }
                    if (check.equals((Object)Status.COMPLETED)) {
                        ++this.mCompletedCmdCount;
                        break;
                    }
                    if (check.equals((Object)Status.CANCELED)) {
                    }
                    break block6;
                }
                catch (IOException e) {
                    System.out.println(String.format("Unable to get running status for command %s. For distributedLoad, the files may already be loaded in Alluxio. For distributedCp, please check file source contains files or not. Please retry using `getCmdStatus` to check command detailed status, or using `fs ls` command to check if the files are already loaded.", jobControlId));
                }
                break;
            }
            CommonUtils.sleepMs((long)5L);
        }
    }

    public void postProcessing(long jobControlId) {
        ArrayList completedFiles = Lists.newArrayList();
        try {
            DistributedCommandUtil.getDetailedCmdStatus(jobControlId, this.mClient, this.mFailedFiles, completedFiles);
            this.mCompletedCount = completedFiles.size();
            this.mFailedCount = this.mFailedFiles.size();
            System.out.format("Finished running the command, jobControlId = %s%n", jobControlId);
        }
        catch (IOException e) {
            System.out.println(String.format("Unable to get detailed command information for command %s, the files may already be loaded in Alluxio or file souce may not contain files%n", jobControlId));
        }
    }

    protected void processFailures(String arg, Set<String> failures, String logFileLocation) {
        String path = String.join((CharSequence)"_", StringUtils.split((String)arg, (String)"/"));
        String failurePath = String.format(logFileLocation, path);
        StringBuilder output = new StringBuilder();
        output.append("Here are failed files: \n");
        Iterator<String> iterator = failures.iterator();
        for (int i = 0; i < Math.min(20, failures.size()); ++i) {
            String failure = iterator.next();
            output.append(failure);
            output.append(",\n");
        }
        output.append(String.format("Check out %s for full list of failed files.%n", failurePath));
        System.out.print(output);
        try (FileOutputStream writer = FileUtils.openOutputStream((File)new File(failurePath));){
            for (String failure : failures) {
                writer.write(String.format("%s%n", failure).getBytes(StandardCharsets.UTF_8));
            }
        }
        catch (Exception e) {
            System.out.println("Exception writing failure files:");
            System.out.println(e.getMessage());
        }
    }

    public int getCompletedCount() {
        return this.mCompletedCount;
    }

    public int getFailedCount() {
        return this.mFailedCount;
    }

    public int getFailedCmdCount() {
        return this.mFailedCmdCount;
    }

    public int getCompletedCmdCount() {
        return this.mCompletedCmdCount;
    }

    public Set<String> getFailedFiles() {
        return this.mFailedFiles;
    }
}

