/*
 * Decompiled with CFR 0.152.
 */
package alluxio.cli.fs.command;

import alluxio.AlluxioURI;
import alluxio.cli.Command;
import alluxio.cli.CommandUtils;
import alluxio.cli.fs.FileSystemShellUtils;
import alluxio.cli.fs.command.AbstractDistributedJobCommand;
import alluxio.client.file.FileSystemContext;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.exception.status.InvalidArgumentException;
import alluxio.job.CmdConfig;
import alluxio.job.cmd.load.LoadCliConfig;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;

@ThreadSafe
public final class DistributedLoadCommand
extends AbstractDistributedJobCommand {
    private static final int DEFAULT_REPLICATION = 1;
    private static final String DEFAULT_FAILURE_FILE_PATH = "./logs/user/distributedLoad_%s_failures.csv";
    private static final Option REPLICATION_OPTION = Option.builder().longOpt("replication").required(false).hasArg(true).numberOfArgs(1).type(Number.class).argName("replicas").desc("Number of block replicas of each loaded file, default: 1").build();
    private static final Option ACTIVE_JOB_COUNT_OPTION = Option.builder().longOpt("active-jobs").required(false).hasArg(true).numberOfArgs(1).type(Number.class).argName("active job count").desc("Number of active jobs that can run at the same time. Later jobs must wait. The default upper limit is 3000").build();
    private static final Option INDEX_FILE = Option.builder().longOpt("index").required(false).hasArg(true).numberOfArgs(0).type(String.class).argName("index file").desc("Name of the index file that lists all files to be loaded").build();
    private static final Option HOSTS_OPTION = Option.builder().longOpt("hosts").required(false).hasArg(true).numberOfArgs(1).argName("hosts").desc("A list of worker hosts separated by comma. When host and locality options are not set, all hosts will be selected unless explicitly excluded by setting excluded option('excluded-hosts', 'excluded-host-file', 'excluded-locality' and 'excluded-locality-file'). Only one of the 'hosts' and 'host-file' should be set, and it should not be set with excluded option together.").build();
    private static final Option HOST_FILE_OPTION = Option.builder().longOpt("host-file").required(false).hasArg(true).numberOfArgs(1).argName("host-file").desc("Host File contains worker hosts, each line has a worker host. When host and locality options are not set, all hosts will be selected unless explicitly excluded by setting excluded option('excluded-hosts', 'excluded-host-file', 'excluded-locality' and 'excluded-locality-file'). Only one of the 'hosts' and 'host-file' should be set, and it should not be set with excluded option together.").build();
    private static final Option EXCLUDED_HOSTS_OPTION = Option.builder().longOpt("excluded-hosts").required(false).hasArg(true).numberOfArgs(1).argName("excluded-hosts").desc("A list of excluded worker hosts separated by comma. Only one of the 'excluded-hosts' and 'excluded-host-file' should be set, and it should not be set with 'hosts', 'host-file', 'locality' and 'locality-file' together.").build();
    private static final Option EXCLUDED_HOST_FILE_OPTION = Option.builder().longOpt("excluded-host-file").required(false).hasArg(true).numberOfArgs(1).argName("excluded-host-file").desc("Host File contains excluded worker hosts, each line has a worker host. Only one of the 'excluded-hosts' and 'excluded-host-file' should be set, and it should not be set with 'hosts', 'host-file', 'locality' and 'locality-file' together.").build();
    private static final Option LOCALITY_OPTION = Option.builder().longOpt("locality").required(false).hasArg(true).numberOfArgs(1).argName("locality").desc("A list of worker locality separated by comma. When host and locality options are not set, all hosts will be selected unless explicitly excluded by setting excluded option('excluded-hosts', 'excluded-host-file', 'excluded-locality' and 'excluded-locality-file'). Only one of the 'locality' and 'locality-file' should be set, and it should not be set with excluded option together.").build();
    private static final Option LOCALITY_FILE_OPTION = Option.builder().longOpt("locality-file").required(false).hasArg(true).numberOfArgs(1).argName("locality-file").argName("locality-file").desc("Locality File contains worker localities, each line has a worker locality. When host and locality options are not set, all hosts will be selected unless explicitly excluded by setting excluded option('excluded-hosts', 'excluded-host-file', 'excluded-locality' and 'excluded-locality-file'). Only one of the 'locality' and 'locality-file' should be set, and it should not be set with excluded option together.").build();
    private static final Option EXCLUDED_LOCALITY_OPTION = Option.builder().longOpt("excluded-locality").required(false).hasArg(true).numberOfArgs(1).argName("excluded-locality").desc("A list of excluded worker locality separated by comma. Only one of the 'excluded-locality' and 'excluded-locality-file' should be set, and it should not be set with 'hosts', 'host-file', 'locality' and 'locality-file' together.").build();
    private static final Option EXCLUDED_LOCALITY_FILE_OPTION = Option.builder().longOpt("excluded-locality-file").required(false).hasArg(true).numberOfArgs(1).argName("excluded-locality-file").desc("Locality File contains excluded worker localities, each line has a worker locality. Only one of the 'excluded-locality' and 'excluded-locality-file' should be set, and it should not be set with 'hosts', 'host-file', 'locality' and 'locality-file' together.").build();
    private static final Option BATCH_SIZE_OPTION = Option.builder().longOpt("batch-size").required(false).hasArg(true).numberOfArgs(1).type(Number.class).argName("batch-size").desc("Number of files per request").build();
    private static final Option PASSIVE_CACHE_OPTION = Option.builder().longOpt("passive-cache").required(false).hasArg(false).desc("Use passive-cache as the cache implementation, turn on to use the old cache through read implementation. Passive-cache is default when there's no option set or both options are set for cache implementation.Notice that this flag is temporary, and it would retire after direct cache graduate from experimental stage").build();
    private static final Option DIRECT_CACHE_OPTION = Option.builder().longOpt("direct-cache").required(false).hasArg(false).desc("Use direct cache request as the cache implementation, turn on to use the new cache through cache manager implementation. Notice that this flag is temporary, and it would retire after direct cache graduate from experimental stage").build();

    public DistributedLoadCommand(FileSystemContext fsContext) {
        super(fsContext);
    }

    public String getCommandName() {
        return "distributedLoad";
    }

    public Options getOptions() {
        return new Options().addOption(REPLICATION_OPTION).addOption(ACTIVE_JOB_COUNT_OPTION).addOption(INDEX_FILE).addOption(HOSTS_OPTION).addOption(HOST_FILE_OPTION).addOption(EXCLUDED_HOSTS_OPTION).addOption(EXCLUDED_HOST_FILE_OPTION).addOption(LOCALITY_OPTION).addOption(LOCALITY_FILE_OPTION).addOption(EXCLUDED_LOCALITY_OPTION).addOption(EXCLUDED_LOCALITY_FILE_OPTION).addOption(PASSIVE_CACHE_OPTION).addOption(DIRECT_CACHE_OPTION).addOption(BATCH_SIZE_OPTION).addOption(ASYNC_OPTION);
    }

    public void validateArgs(CommandLine cl) throws InvalidArgumentException {
        CommandUtils.checkNumOfArgsEquals((Command)this, (CommandLine)cl, (int)1);
    }

    public String getUsage() {
        return "distributedLoad [--replication <num>] [--active-jobs <num>] [--batch-size <num>] [--index] [--hosts <host1>,<host2>,...,<hostN>] [--host-file <hostFilePath>] [--excluded-hosts <host1>,<host2>,...,<hostN>] [--excluded-host-file <hostFilePath>] [--locality <locality1>,<locality2>,...,<localityN>] [--locality-file <localityFilePath>] [--excluded-locality <locality1>,<locality2>,...,<localityN>] [--excluded-locality-file <localityFilePath>] [--passive-cache] [--direct-cache] <path>";
    }

    public String getDescription() {
        return "Loads a file or all files in a directory into Alluxio space.";
    }

    public int run(CommandLine cl) throws AlluxioException, IOException {
        Long jobControlId;
        String localityFile;
        String argOption;
        String hostFile;
        this.mActiveJobs = FileSystemShellUtils.getIntArg(cl, ACTIVE_JOB_COUNT_OPTION, 3000);
        String[] args = cl.getArgs();
        AlluxioConfiguration conf = this.mFsContext.getClusterConf();
        int defaultBatchSize = conf.getInt(PropertyKey.JOB_REQUEST_BATCH_SIZE);
        int replication = FileSystemShellUtils.getIntArg(cl, REPLICATION_OPTION, 1);
        int batchSize = FileSystemShellUtils.getIntArg(cl, BATCH_SIZE_OPTION, defaultBatchSize);
        boolean directCache = !cl.hasOption(PASSIVE_CACHE_OPTION.getLongOpt()) && cl.hasOption(DIRECT_CACHE_OPTION.getLongOpt());
        boolean async = cl.hasOption(ASYNC_OPTION.getLongOpt());
        if (async) {
            System.out.println("Entering async submission mode. ");
        }
        HashSet<String> workerSet = new HashSet<String>();
        HashSet<String> excludedWorkerSet = new HashSet<String>();
        HashSet<String> localityIds = new HashSet<String>();
        HashSet<String> excludedLocalityIds = new HashSet<String>();
        if (cl.hasOption(HOST_FILE_OPTION.getLongOpt())) {
            hostFile = cl.getOptionValue(HOST_FILE_OPTION.getLongOpt()).trim();
            this.readLinesToSet(workerSet, hostFile);
        } else if (cl.hasOption(HOSTS_OPTION.getLongOpt())) {
            argOption = cl.getOptionValue(HOSTS_OPTION.getLongOpt()).trim();
            this.readItemsFromOptionString(workerSet, argOption);
        }
        if (cl.hasOption(EXCLUDED_HOST_FILE_OPTION.getLongOpt())) {
            hostFile = cl.getOptionValue(EXCLUDED_HOST_FILE_OPTION.getLongOpt()).trim();
            this.readLinesToSet(excludedWorkerSet, hostFile);
        } else if (cl.hasOption(EXCLUDED_HOSTS_OPTION.getLongOpt())) {
            argOption = cl.getOptionValue(EXCLUDED_HOSTS_OPTION.getLongOpt()).trim();
            this.readItemsFromOptionString(excludedWorkerSet, argOption);
        }
        if (cl.hasOption(LOCALITY_FILE_OPTION.getLongOpt())) {
            localityFile = cl.getOptionValue(LOCALITY_FILE_OPTION.getLongOpt()).trim();
            this.readLinesToSet(localityIds, localityFile);
        } else if (cl.hasOption(LOCALITY_OPTION.getLongOpt())) {
            argOption = cl.getOptionValue(LOCALITY_OPTION.getLongOpt()).trim();
            this.readItemsFromOptionString(localityIds, argOption);
        }
        if (cl.hasOption(EXCLUDED_LOCALITY_FILE_OPTION.getLongOpt())) {
            localityFile = cl.getOptionValue(EXCLUDED_LOCALITY_FILE_OPTION.getLongOpt()).trim();
            this.readLinesToSet(excludedLocalityIds, localityFile);
        } else if (cl.hasOption(EXCLUDED_LOCALITY_OPTION.getLongOpt())) {
            argOption = cl.getOptionValue(EXCLUDED_LOCALITY_OPTION.getLongOpt()).trim();
            this.readItemsFromOptionString(excludedLocalityIds, argOption);
        }
        System.out.println("Please wait for command submission to finish..");
        if (!cl.hasOption(INDEX_FILE.getLongOpt())) {
            AlluxioURI path = new AlluxioURI(args[0]);
            jobControlId = this.runDistLoad(path, replication, batchSize, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache);
            if (!async) {
                System.out.format("Submitted successfully, jobControlId = %s%nWaiting for the command to finish ...%n", jobControlId.toString());
                this.waitForCmd(jobControlId);
                this.postProcessing(jobControlId);
            } else {
                System.out.format("Submitted distLoad job successfully, jobControlId = %s%n", jobControlId.toString());
            }
        } else {
            try (BufferedReader reader = new BufferedReader(new FileReader(args[0]));){
                String filename;
                while ((filename = reader.readLine()) != null) {
                    AlluxioURI path = new AlluxioURI(filename);
                    jobControlId = this.runDistLoad(path, replication, batchSize, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache);
                    if (!async) {
                        System.out.format("Submitted successfully, jobControlId = %s%nWaiting for the command to finish ...%n", jobControlId.toString());
                        this.waitForCmd(jobControlId);
                        this.postProcessing(jobControlId);
                        continue;
                    }
                    System.out.format("Submitted distLoad job successfully, jobControlId = %s%n", jobControlId.toString());
                }
            }
        }
        Set<String> failures = this.getFailedFiles();
        if (failures.size() > 0) {
            this.processFailures(args[0], failures, DEFAULT_FAILURE_FILE_PATH);
        }
        return 0;
    }

    public Long runDistLoad(AlluxioURI filePath, int replication, int batchSize, Set<String> workerSet, Set<String> excludedWorkerSet, Set<String> localityIds, Set<String> excludedLocalityIds, boolean directCache) {
        LoadCliConfig cmdConfig = new LoadCliConfig(filePath.getPath(), Integer.valueOf(batchSize), Integer.valueOf(replication), workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache);
        return this.submit((CmdConfig)cmdConfig);
    }

    private void readItemsFromOptionString(Set<String> localityIds, String argOption) {
        for (String locality : StringUtils.split((String)argOption, (String)",")) {
            if ((locality = locality.trim().toUpperCase()).isEmpty()) continue;
            localityIds.add(locality);
        }
    }

    private void readLinesToSet(Set<String> workerSet, String hostFile) throws IOException {
        try (BufferedReader reader = new BufferedReader(new FileReader(hostFile));){
            String worker;
            while ((worker = reader.readLine()) != null) {
                if ((worker = worker.trim().toUpperCase()).isEmpty()) continue;
                workerSet.add(worker);
            }
        }
    }

    public void close() throws IOException {
        this.mClient.close();
    }
}

