/*
 * Decompiled with CFR 0.152.
 */
package alluxio.cli.fs.command;

import alluxio.AlluxioURI;
import alluxio.cli.fs.command.AbstractDistributedJobCommand;
import alluxio.cli.fs.command.job.JobAttempt;
import alluxio.client.file.URIStatus;
import alluxio.client.job.JobMasterClient;
import alluxio.exception.AlluxioException;
import alluxio.grpc.ListStatusPOptions;
import alluxio.job.JobConfig;
import alluxio.job.plan.BatchedJobConfig;
import alluxio.job.plan.load.LoadConfig;
import alluxio.job.wire.JobInfo;
import alluxio.retry.CountingRetry;
import alluxio.retry.RetryPolicy;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.LongAdder;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class DistributedLoadUtils {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedLoadUtils.class);

    private DistributedLoadUtils() {
    }

    public static void distributedLoad(AbstractDistributedJobCommand command, List<URIStatus> pool, int batchSize, AlluxioURI filePath, int replication, Set<String> workerSet, Set<String> excludedWorkerSet, Set<String> localityIds, Set<String> excludedLocalityIds, boolean directCache, boolean printOut) throws AlluxioException, IOException {
        DistributedLoadUtils.load(command, pool, batchSize, filePath, replication, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache, printOut);
        if (pool.size() > 0) {
            DistributedLoadUtils.addJob(command, pool, replication, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache, printOut);
            pool.clear();
        }
        command.drain();
    }

    private static void load(AbstractDistributedJobCommand command, List<URIStatus> pool, int batchSize, AlluxioURI filePath, int replication, Set<String> workerSet, Set<String> excludedWorkerSet, Set<String> localityIds, Set<String> excludedLocalityIds, boolean directCache, boolean printOut) throws IOException, AlluxioException {
        ListStatusPOptions options = ListStatusPOptions.newBuilder().setRecursive(true).build();
        LongAdder incompleteCount = new LongAdder();
        command.mFileSystem.iterateStatus(filePath, options, uriStatus -> {
            if (!uriStatus.isFolder()) {
                if (!uriStatus.isCompleted()) {
                    incompleteCount.increment();
                    System.out.printf("Ignored load because: %s is in incomplete status", uriStatus.getPath());
                    return;
                }
                AlluxioURI fileURI = new AlluxioURI(uriStatus.getPath());
                if (uriStatus.getInAlluxioPercentage() == 100 && replication == 1) {
                    if (printOut) {
                        System.out.println(fileURI + " is already fully loaded in Alluxio");
                    }
                    return;
                }
                pool.add((URIStatus)uriStatus);
                if (pool.size() == batchSize) {
                    DistributedLoadUtils.addJob(command, pool, replication, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache, printOut);
                    pool.clear();
                }
            }
        });
        if (incompleteCount.longValue() > 0L) {
            System.out.printf("Ignore load %d paths because they are in incomplete status", incompleteCount.longValue());
        }
    }

    private static void addJob(AbstractDistributedJobCommand command, List<URIStatus> statuses, int replication, Set<String> workerSet, Set<String> excludedWorkerSet, Set<String> localityIds, Set<String> excludedLocalityIds, boolean directCache, boolean printOut) {
        if (command.mSubmittedJobAttempts.size() >= command.mActiveJobs) {
            command.waitJob();
        }
        command.mSubmittedJobAttempts.add(DistributedLoadUtils.newJob(command, statuses, replication, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache, printOut));
    }

    private static JobAttempt newJob(AbstractDistributedJobCommand command, List<URIStatus> filePath, int replication, Set<String> workerSet, Set<String> excludedWorkerSet, Set<String> localityIds, Set<String> excludedLocalityIds, boolean directCache, boolean printOut) {
        JobAttempt jobAttempt = LoadJobAttemptFactory.create(command, filePath, replication, workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, directCache, printOut);
        jobAttempt.run();
        return jobAttempt;
    }

    public static class LoadJobAttemptFactory {
        public static JobAttempt create(AbstractDistributedJobCommand command, List<URIStatus> filePath, int replication, Set<String> workerSet, Set<String> excludedWorkerSet, Set<String> localityIds, Set<String> excludedLocalityIds, boolean directCache, boolean printOut) {
            JobAttempt jobAttempt;
            if (filePath.size() == 1) {
                LoadConfig config = new LoadConfig(filePath.iterator().next().getPath(), Integer.valueOf(replication), workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, Boolean.valueOf(directCache));
                if (printOut) {
                    System.out.println(config.getFilePath() + " loading");
                    jobAttempt = new LoadJobAttempt(command.mClient, config, (RetryPolicy)new CountingRetry(3));
                } else {
                    jobAttempt = new SilentLoadJobAttempt(command.mClient, config, (RetryPolicy)new CountingRetry(3));
                }
            } else {
                HashSet configs = Sets.newHashSet();
                ObjectMapper oMapper = new ObjectMapper();
                for (URIStatus status : filePath) {
                    LoadConfig loadConfig = new LoadConfig(status.getPath(), Integer.valueOf(replication), workerSet, excludedWorkerSet, localityIds, excludedLocalityIds, Boolean.valueOf(directCache));
                    Map map = (Map)oMapper.convertValue((Object)loadConfig, Map.class);
                    configs.add(map);
                }
                BatchedJobConfig config = new BatchedJobConfig("Load", (Set)configs);
                jobAttempt = printOut ? new BatchedLoadJobAttempt(command.mClient, config, (RetryPolicy)new CountingRetry(3)) : new SilentBatchedLoadJobAttempt(command.mClient, config, (RetryPolicy)new CountingRetry(3));
            }
            return jobAttempt;
        }
    }

    private static class SilentBatchedLoadJobAttempt
    extends JobAttempt {
        private final BatchedJobConfig mJobConfig;

        SilentBatchedLoadJobAttempt(JobMasterClient client, BatchedJobConfig jobConfig, RetryPolicy retryPolicy) {
            super(client, retryPolicy);
            this.mJobConfig = jobConfig;
        }

        @Override
        public JobConfig getJobConfig() {
            return this.mJobConfig;
        }

        @Override
        public int getSize() {
            return this.mJobConfig.getJobConfigs().size();
        }

        @Override
        public void setFailedFiles() {
            if (!this.mFailedTasks.isEmpty()) {
                for (JobInfo task : this.mFailedTasks) {
                    this.mFailedFiles.add(StringUtils.substringBetween((String)task.getDescription(), (String)"FilePath=", (String)","));
                }
            }
        }

        @Override
        protected void logFailedAttempt(JobInfo jobInfo) {
        }

        @Override
        protected void logFailed() {
        }

        @Override
        protected void logCompleted() {
        }
    }

    private static class BatchedLoadJobAttempt
    extends JobAttempt {
        private final BatchedJobConfig mJobConfig;
        private final String mFilesPathString;

        BatchedLoadJobAttempt(JobMasterClient client, BatchedJobConfig jobConfig, RetryPolicy retryPolicy) {
            super(client, retryPolicy);
            this.mJobConfig = jobConfig;
            String pathString = jobConfig.getJobConfigs().stream().map(x -> (String)x.get("filePath")).collect(Collectors.joining(","));
            this.mFilesPathString = String.format("[%s]", StringUtils.abbreviate((String)pathString, (int)80));
            System.out.printf("files: %s loading%n", this.mFilesPathString);
        }

        @Override
        public JobConfig getJobConfig() {
            return this.mJobConfig;
        }

        @Override
        public int getSize() {
            return this.mJobConfig.getJobConfigs().size();
        }

        @Override
        public void setFailedFiles() {
            if (!this.mFailedTasks.isEmpty()) {
                for (JobInfo task : this.mFailedTasks) {
                    this.mFailedFiles.add(StringUtils.substringBetween((String)task.getDescription(), (String)"FilePath=", (String)","));
                }
            }
        }

        @Override
        protected void logFailedAttempt(JobInfo jobInfo) {
            System.out.printf("Attempt %d to load %s failed because: %s%n", this.mRetryPolicy.getAttemptCount(), this.mFilesPathString, jobInfo.getErrorMessage());
        }

        @Override
        protected void logFailed() {
            System.out.printf("Failed to complete loading %s after %d retries.%n", this.mFilesPathString, this.mRetryPolicy.getAttemptCount());
        }

        @Override
        protected void logCompleted() {
            System.out.printf("Successfully loaded path %s after %d attempts%n", this.mFilesPathString, this.mRetryPolicy.getAttemptCount());
        }
    }

    private static class SilentLoadJobAttempt
    extends JobAttempt {
        private final LoadConfig mJobConfig;

        SilentLoadJobAttempt(JobMasterClient client, LoadConfig jobConfig, RetryPolicy retryPolicy) {
            super(client, retryPolicy);
            this.mJobConfig = jobConfig;
        }

        @Override
        public JobConfig getJobConfig() {
            return this.mJobConfig;
        }

        @Override
        public int getSize() {
            return 1;
        }

        @Override
        public void setFailedFiles() {
            if (!this.mFailedTasks.isEmpty()) {
                this.mFailedFiles = Collections.singleton(this.mJobConfig.getFilePath());
            }
        }

        @Override
        protected void logFailedAttempt(JobInfo jobInfo) {
        }

        @Override
        protected void logFailed() {
        }

        @Override
        protected void logCompleted() {
        }
    }

    private static class LoadJobAttempt
    extends JobAttempt {
        private final LoadConfig mJobConfig;

        LoadJobAttempt(JobMasterClient client, LoadConfig jobConfig, RetryPolicy retryPolicy) {
            super(client, retryPolicy);
            this.mJobConfig = jobConfig;
        }

        @Override
        public JobConfig getJobConfig() {
            return this.mJobConfig;
        }

        @Override
        public int getSize() {
            return 1;
        }

        @Override
        public void setFailedFiles() {
            if (!this.mFailedTasks.isEmpty()) {
                this.mFailedFiles = Collections.singleton(this.mJobConfig.getFilePath());
            }
        }

        @Override
        protected void logFailedAttempt(JobInfo jobInfo) {
            System.out.printf("Attempt %d to load %s failed because: %s%n", this.mRetryPolicy.getAttemptCount(), this.mJobConfig.getFilePath(), jobInfo.getErrorMessage());
        }

        @Override
        protected void logFailed() {
            System.out.printf("Failed to complete loading %s after %d retries.%n", this.mJobConfig.getFilePath(), this.mRetryPolicy.getAttemptCount());
        }

        @Override
        protected void logCompleted() {
            System.out.printf("Successfully loaded path %s after %d attempts%n", this.mJobConfig.getFilePath(), this.mRetryPolicy.getAttemptCount());
        }
    }
}

