/*
 * Decompiled with CFR 0.152.
 */
package alluxio.job.plan.stress;

import alluxio.collections.Pair;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.job.RunTaskContext;
import alluxio.job.SelectExecutorsContext;
import alluxio.job.plan.PlanDefinition;
import alluxio.job.util.SerializationUtils;
import alluxio.resource.CloseableResource;
import alluxio.stress.TaskResult;
import alluxio.stress.job.StressBenchConfig;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.UnderFileSystemConfiguration;
import alluxio.util.JsonSerializable;
import alluxio.util.ShellUtils;
import alluxio.wire.MountPointInfo;
import alluxio.wire.WorkerInfo;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class StressBenchDefinition
implements PlanDefinition<StressBenchConfig, ArrayList<String>, String> {
    private static final Logger LOG = LoggerFactory.getLogger(StressBenchDefinition.class);

    @Override
    public Class<StressBenchConfig> getJobConfigClass() {
        return StressBenchConfig.class;
    }

    @Override
    public Set<Pair<WorkerInfo, ArrayList<String>>> selectExecutors(StressBenchConfig config, List<WorkerInfo> jobWorkerInfoList, SelectExecutorsContext context) {
        HashSet result = Sets.newHashSet();
        List workerList = Lists.newArrayList(jobWorkerInfoList);
        workerList.sort(Comparator.comparing(w -> w.getAddress().getHost()));
        int clusterLimit = config.getClusterLimit();
        if (clusterLimit == 0) {
            clusterLimit = workerList.size();
        }
        if (clusterLimit < 0) {
            clusterLimit = -clusterLimit;
            Collections.reverse(workerList);
        }
        workerList = workerList.subList(0, clusterLimit);
        for (WorkerInfo worker : workerList) {
            LOG.info("Generating job for worker {}", (Object)worker.getId());
            ArrayList<String> args = new ArrayList<String>(2);
            args.add("--id");
            args.add(worker.getAddress().getHost() + "-" + worker.getId());
            result.add(new Pair((Object)worker, args));
        }
        return result;
    }

    private Map<String, Object> getUfsConf(String ufsUri, RunTaskContext runTaskContext) throws Exception {
        Map mountTable = runTaskContext.getFileSystem().getMountTable();
        for (Map.Entry entry : mountTable.entrySet()) {
            if (!ufsUri.startsWith(((MountPointInfo)entry.getValue()).getUfsUri())) continue;
            CloseableResource resource = runTaskContext.getUfsManager().get(((MountPointInfo)entry.getValue()).getMountId()).acquireUfsResource();
            Throwable throwable = null;
            try {
                AlluxioConfiguration configuration = ((UnderFileSystem)resource.get()).getConfiguration();
                if (!(configuration instanceof UnderFileSystemConfiguration)) continue;
                Map map = ((UnderFileSystemConfiguration)configuration).getMountSpecificConf();
                return map;
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (resource == null) continue;
                if (throwable != null) {
                    try {
                        resource.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                resource.close();
            }
        }
        return ImmutableMap.of();
    }

    @Override
    public String runTask(StressBenchConfig config, ArrayList<String> args, RunTaskContext runTaskContext) throws Exception {
        ArrayList<String> command = new ArrayList<String>(3 + config.getArgs().size());
        command.add(Configuration.get((PropertyKey)PropertyKey.HOME) + "/bin/alluxio");
        command.add("runClass");
        command.add(config.getClassName());
        command.add("--distributed");
        command.add("--in-process");
        ArrayList<String> commandArgs = config.getArgs();
        ArrayList<String> newArgs = new ArrayList<String>();
        if (commandArgs.stream().anyMatch(s -> s.equals("--use-mount-conf"))) {
            boolean nextElem = false;
            boolean removeNext = false;
            String ufsUri = "";
            for (String elem : commandArgs) {
                if (elem.equals("--conf")) {
                    removeNext = true;
                } else {
                    if (!removeNext) {
                        newArgs.add(elem);
                    }
                    removeNext = false;
                }
                if (elem.equals("--path")) {
                    nextElem = true;
                    continue;
                }
                if (nextElem) {
                    ufsUri = elem;
                    break;
                }
                nextElem = false;
            }
            commandArgs = newArgs;
            List properties = this.getUfsConf(ufsUri, runTaskContext).entrySet().stream().map(entry -> "--conf" + (String)entry.getKey() + "=" + entry.getValue()).collect(Collectors.toList());
            commandArgs.addAll(properties);
        }
        if (config.getArgs().stream().noneMatch(s -> s.equals("--start-ms"))) {
            command.add("--start-ms");
            command.add(Long.toString(System.currentTimeMillis() + config.getStartDelayMs()));
        }
        command.addAll(commandArgs);
        command.addAll(args);
        return ShellUtils.execCommand((String[])command.toArray(new String[0]));
    }

    @Override
    public String join(StressBenchConfig config, Map<WorkerInfo, String> taskResults) throws Exception {
        if (taskResults.isEmpty()) {
            throw new IOException("No results from any workers.");
        }
        AtomicReference<Object> error = new AtomicReference<Object>(null);
        List results = taskResults.entrySet().stream().map(entry -> {
            try {
                String result = SerializationUtils.parseBenchmarkResult((String)((String)entry.getValue()).trim());
                return (TaskResult)JsonSerializable.fromJson((String)result, (JsonSerializable[])new TaskResult[0]);
            }
            catch (IOException | ClassNotFoundException e) {
                LOG.warn("Failed to parse result into class {}", TaskResult.class, (Object)e);
                error.set(new IOException(String.format("Failed to parse task output from %s into result class %s: %s", ((WorkerInfo)entry.getKey()).getAddress().getHost(), TaskResult.class, ((String)entry.getValue()).trim()), e));
                return null;
            }
        }).collect(Collectors.toList());
        if (error.get() != null) {
            throw (IOException)error.get();
        }
        return ((TaskResult)results.get(0)).aggregator().aggregate(results).toJson();
    }
}

