/*
 * Decompiled with CFR 0.152.
 */
package alluxio.cli.fsadmin.metrics;

import alluxio.cli.Command;
import alluxio.cli.CommandUtils;
import alluxio.cli.fs.FileSystemShellUtils;
import alluxio.cli.fsadmin.command.AbstractFsAdminCommand;
import alluxio.cli.fsadmin.command.Context;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.block.stream.BlockWorkerClient;
import alluxio.client.file.FileSystemContext;
import alluxio.conf.AlluxioConfiguration;
import alluxio.exception.status.InvalidArgumentException;
import alluxio.grpc.ClearMetricsRequest;
import alluxio.resource.CloseableResource;
import alluxio.util.ThreadFactoryUtils;
import alluxio.wire.WorkerNetAddress;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;

public final class ClearCommand
extends AbstractFsAdminCommand {
    private static final String MASTER_OPTION_NAME = "master";
    private static final String WORKERS_OPTION_NAME = "workers";
    private static final String PARALLELISM_OPTION_NAME = "parallelism";
    private static final int DEFAULT_PARALLELISM = 8;
    private static final Option MASTER_OPTION = Option.builder().longOpt("master").required(false).hasArg(false).desc("Clear the metrics of Alluxio leading master").build();
    private static final Option WORKERS_OPTION = Option.builder().longOpt("workers").required(false).hasArg(true).desc("Clear metrics of specified workers. Pass in the worker hostnames separated by comma").build();
    private static final Option PARALLELISM_OPTION = Option.builder().longOpt("parallelism").required(false).hasArg(true).argName("# concurrent operations").desc("Number of concurrent worker metrics clear operations, default: 8").build();
    private final AlluxioConfiguration mAlluxioConf;

    public ClearCommand(Context context, AlluxioConfiguration alluxioConf) {
        super(context);
        this.mAlluxioConf = alluxioConf;
    }

    public String getCommandName() {
        return "clear";
    }

    public Options getOptions() {
        return new Options().addOption(MASTER_OPTION).addOption(WORKERS_OPTION).addOption(PARALLELISM_OPTION);
    }

    public void validateArgs(CommandLine cl) throws InvalidArgumentException {
        CommandUtils.checkNumOfArgsNoMoreThan((Command)this, (CommandLine)cl, (int)3);
    }

    public int run(CommandLine cl) throws IOException {
        boolean clearMaster;
        Option[] options = cl.getOptions();
        boolean clearWorkers = options.length == 0 || cl.hasOption(WORKERS_OPTION_NAME) || options.length == 1 && cl.hasOption(PARALLELISM_OPTION_NAME);
        boolean bl = clearMaster = options.length == 0 || cl.hasOption(MASTER_OPTION_NAME) || options.length == 1 && cl.hasOption(PARALLELISM_OPTION_NAME);
        if (clearWorkers) {
            int globalParallelism = FileSystemShellUtils.getIntArg(cl, PARALLELISM_OPTION, 8);
            try (FileSystemContext context = FileSystemContext.create((AlluxioConfiguration)this.mAlluxioConf);){
                List<WorkerNetAddress> addressList = context.getCachedWorkers().stream().map(BlockWorkerInfo::getNetAddress).collect(Collectors.toList());
                if (cl.hasOption(WORKERS_OPTION_NAME)) {
                    String workersValue = cl.getOptionValue(WORKERS_OPTION_NAME);
                    HashSet<String> workersRequired = new HashSet<String>(Arrays.asList(workersValue.split(",")));
                    ArrayList<WorkerNetAddress> workersToClear = new ArrayList<WorkerNetAddress>();
                    for (WorkerNetAddress worker : addressList) {
                        if (!workersRequired.contains(worker.getHost())) continue;
                        workersToClear.add(worker);
                        workersRequired.remove(worker.getHost());
                    }
                    if (workersRequired.size() != 0) {
                        System.out.printf("Cannot find workers of hostnames %s%n", String.join((CharSequence)",", workersRequired));
                        System.out.printf("Valid workers include %s%n", this.addressListToString(addressList));
                        int n = -1;
                        return n;
                    }
                    if (!this.clearWorkers(workersToClear, context, globalParallelism)) {
                        System.out.printf("Failed to clear metrics of workers %s%n", this.addressListToString(workersToClear));
                        int n = -1;
                        return n;
                    }
                } else if (!this.clearWorkers(addressList, context, globalParallelism)) {
                    System.out.printf("Failed to clear metrics of workers %s%n", this.addressListToString(addressList));
                    int n = -1;
                    return n;
                }
            }
        }
        if (clearMaster) {
            try {
                this.mMetricsClient.clearMetrics();
                System.out.printf("Successfully cleared metrics of Alluxio leading master.%n", new Object[0]);
            }
            catch (Exception e) {
                System.out.println("Fatal error: " + e);
                return -1;
            }
        }
        return 0;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean clearWorkers(List<WorkerNetAddress> workers, FileSystemContext context, int globalParallelism) throws IOException {
        int workerNum = workers.size();
        if (workerNum == 0) {
            System.out.println("No worker metrics to clear.");
            return true;
        }
        if (workerNum == 1) {
            this.clearWorkerMetrics(workers.get(0), context);
        } else {
            ArrayList<Future<Void>> futures = new ArrayList<Future<Void>>();
            int parallelism = Math.min(workerNum, globalParallelism);
            ExecutorService service = Executors.newFixedThreadPool(parallelism, ThreadFactoryUtils.build((String)"metrics-clear-cli-%d", (boolean)true));
            for (WorkerNetAddress workerNetAddress : workers) {
                futures.add(service.submit(new ClearCallable(workerNetAddress, context)));
            }
            try {
                for (Future bl : futures) {
                    bl.get();
                }
            }
            catch (ExecutionException e) {
                System.out.println("Fatal error: " + e);
                boolean bl = false;
                return bl;
            }
            catch (InterruptedException e) {
                System.out.println("Metrics clearance interrupted, exiting.");
                boolean bl = false;
                return bl;
            }
            finally {
                service.shutdownNow();
            }
        }
        return true;
    }

    private String addressListToString(List<WorkerNetAddress> addressList) {
        return Arrays.toString(addressList.stream().map(WorkerNetAddress::getHost).toArray(String[]::new));
    }

    private void clearWorkerMetrics(WorkerNetAddress worker, FileSystemContext context) throws IOException {
        try (CloseableResource blockWorkerClient = context.acquireBlockWorkerClient(worker);){
            ((BlockWorkerClient)blockWorkerClient.get()).clearMetrics(ClearMetricsRequest.newBuilder().build());
        }
        System.out.printf("Successfully cleared metrics of worker %s.%n", worker.getHost());
    }

    public String getUsage() {
        return String.format("%s [--%s] [--%s <worker_hostnames>] [--%s <#>] %n\t--%s: %s%n\t--%s: %s%n\t--%s: %s%n", this.getCommandName(), MASTER_OPTION_NAME, WORKERS_OPTION_NAME, PARALLELISM_OPTION_NAME, MASTER_OPTION_NAME, MASTER_OPTION.getDescription(), WORKERS_OPTION_NAME, WORKERS_OPTION.getDescription(), PARALLELISM_OPTION_NAME, PARALLELISM_OPTION.getDescription());
    }

    @VisibleForTesting
    public static String description() {
        return "Clear the metrics of the whole cluster by default. Users can pass in options to decide metrics of which nodes to be cleared. This command is useful when getting metrics information in short-term testing. This command should be used sparingly as it may affect the current metrics recording and reporting which may lead to metrics incorrectness and affect worker/client heartbeats with leading master.";
    }

    public String getDescription() {
        return ClearCommand.description();
    }

    private class ClearCallable
    implements Callable<Void> {
        private final WorkerNetAddress mWorker;
        private final FileSystemContext mContext;

        ClearCallable(WorkerNetAddress worker, FileSystemContext context) {
            this.mWorker = worker;
            this.mContext = context;
        }

        @Override
        public Void call() throws Exception {
            ClearCommand.this.clearWorkerMetrics(this.mWorker, this.mContext);
            return null;
        }
    }
}

