/*
 * Decompiled with CFR 0.152.
 */
package alluxio.job.plan.load;

import alluxio.AlluxioURI;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.file.URIStatus;
import alluxio.collections.Pair;
import alluxio.exception.status.FailedPreconditionException;
import alluxio.job.RunTaskContext;
import alluxio.job.SelectExecutorsContext;
import alluxio.job.plan.AbstractVoidPlanDefinition;
import alluxio.job.plan.load.LoadConfig;
import alluxio.job.util.JobUtils;
import alluxio.job.util.SerializableVoid;
import alluxio.util.CommonUtils;
import alluxio.wire.BlockLocation;
import alluxio.wire.FileBlockInfo;
import alluxio.wire.TieredIdentity;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerNetAddress;
import com.google.common.base.MoreObjects;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public final class LoadDefinition
extends AbstractVoidPlanDefinition<LoadConfig, ArrayList<LoadTask>> {
    private static final Logger LOG = LoggerFactory.getLogger(LoadDefinition.class);
    private static final int MAX_BUFFER_SIZE = 524288000;
    private static final int JOBS_PER_WORKER = 10;

    @Override
    public Set<Pair<WorkerInfo, ArrayList<LoadTask>>> selectExecutors(LoadConfig config, List<WorkerInfo> jobWorkerInfoList, SelectExecutorsContext context) throws Exception {
        Map<String, WorkerInfo> jobWorkersByAddress = jobWorkerInfoList.stream().collect(Collectors.toMap(info -> info.getAddress().getHost(), info -> info));
        ArrayList<String> missingJobWorkerHosts = new ArrayList<String>();
        ArrayList<BlockWorkerInfo> workers = new ArrayList<BlockWorkerInfo>();
        for (BlockWorkerInfo worker : context.getFsContext().getCachedWorkers()) {
            if (jobWorkersByAddress.containsKey(worker.getNetAddress().getHost())) {
                String workerHost = worker.getNetAddress().getHost().toUpperCase();
                if (!this.isEmptySet(config.getExcludedWorkerSet()) && config.getExcludedWorkerSet().contains(workerHost)) continue;
                boolean match = false;
                if (!(worker.getNetAddress().getTieredIdentity().getTiers() == null || this.isEmptySet(config.getLocalityIds()) && this.isEmptySet(config.getExcludedLocalityIds()))) {
                    boolean exclude = false;
                    for (TieredIdentity.LocalityTier tier : worker.getNetAddress().getTieredIdentity().getTiers()) {
                        if (!this.isEmptySet(config.getExcludedLocalityIds()) && config.getExcludedLocalityIds().contains(tier.getValue().toUpperCase())) {
                            exclude = true;
                            break;
                        }
                        if (this.isEmptySet(config.getLocalityIds()) || !config.getLocalityIds().contains(tier.getValue().toUpperCase())) continue;
                        match = true;
                        break;
                    }
                    if (exclude) continue;
                }
                if ((!this.isEmptySet(config.getWorkerSet()) || !this.isEmptySet(config.getLocalityIds())) && !match && (this.isEmptySet(config.getWorkerSet()) || !config.getWorkerSet().contains(workerHost))) continue;
                workers.add(worker);
                continue;
            }
            LOG.warn("Worker on host {} has no local job worker", (Object)worker.getNetAddress().getHost());
            missingJobWorkerHosts.add(worker.getNetAddress().getHost());
        }
        LinkedListMultimap assignments = LinkedListMultimap.create();
        AlluxioURI uri = new AlluxioURI(config.getFilePath());
        for (FileBlockInfo blockInfo : context.getFileSystem().getStatus(uri).getFileBlockInfos()) {
            List<BlockWorkerInfo> workersWithoutBlock = this.getWorkersWithoutBlock(workers, blockInfo);
            int neededReplicas = config.getReplication() - blockInfo.getBlockInfo().getLocations().size();
            if (workersWithoutBlock.size() < neededReplicas) {
                String missingJobWorkersMessage = "";
                if (!missingJobWorkerHosts.isEmpty()) {
                    missingJobWorkersMessage = ". The following workers could not be used because they have no local job workers: " + missingJobWorkerHosts;
                }
                throw new FailedPreconditionException(String.format("Failed to find enough block workers to replicate to. Needed %s but only found %s. Available workers without the block: %s" + missingJobWorkersMessage, neededReplicas, workersWithoutBlock.size(), workersWithoutBlock));
            }
            Collections.shuffle(workersWithoutBlock);
            for (int i = 0; i < neededReplicas; ++i) {
                String address = workersWithoutBlock.get(i).getNetAddress().getHost();
                WorkerInfo jobWorker = jobWorkersByAddress.get(address);
                assignments.put((Object)jobWorker, (Object)new LoadTask(blockInfo.getBlockInfo().getBlockId(), workersWithoutBlock.get(i).getNetAddress(), config.getFilePath()));
            }
        }
        HashSet result = Sets.newHashSet();
        for (Map.Entry assignment : assignments.asMap().entrySet()) {
            Collection loadTasks = (Collection)assignment.getValue();
            List partitionedTasks = CommonUtils.partition((List)Lists.newArrayList((Iterable)loadTasks), (int)10);
            for (List tasks : partitionedTasks) {
                if (tasks.isEmpty()) continue;
                result.add(new Pair(assignment.getKey(), (Object)Lists.newArrayList((Iterable)tasks)));
            }
        }
        return result;
    }

    private List<BlockWorkerInfo> getWorkersWithoutBlock(List<BlockWorkerInfo> blockWorkers, FileBlockInfo blockInfo) {
        List blockLocations = blockInfo.getBlockInfo().getLocations().stream().map(BlockLocation::getWorkerAddress).collect(Collectors.toList());
        return blockWorkers.stream().filter(worker -> !blockLocations.contains(worker.getNetAddress())).collect(Collectors.toList());
    }

    @Override
    public SerializableVoid runTask(LoadConfig config, ArrayList<LoadTask> tasks, RunTaskContext context) throws Exception {
        URIStatus status = context.getFileSystem().getStatus(new AlluxioURI(config.getFilePath()));
        for (LoadTask task : tasks) {
            JobUtils.loadBlock(status, context.getFsContext(), task.getBlockId(), task.getWorkerNetAddress(), config.isDirectCache());
            LOG.info("Loaded file " + config.getFilePath() + " block " + task.getBlockId());
        }
        return null;
    }

    private boolean isEmptySet(Set s) {
        return s == null || s.isEmpty();
    }

    @Override
    public Class<LoadConfig> getJobConfigClass() {
        return LoadConfig.class;
    }

    public static class LoadTask
    implements Serializable {
        private static final long serialVersionUID = 2028545900913354425L;
        final long mBlockId;
        final String mFilePath;
        final WorkerNetAddress mWorkerNetAddress;

        public LoadTask(long blockId, WorkerNetAddress workerNetAddress, String filePath) {
            this.mBlockId = blockId;
            this.mWorkerNetAddress = workerNetAddress;
            this.mFilePath = filePath;
        }

        public long getBlockId() {
            return this.mBlockId;
        }

        public String getFilePath() {
            return this.mFilePath;
        }

        public WorkerNetAddress getWorkerNetAddress() {
            return this.mWorkerNetAddress;
        }

        public String toString() {
            return MoreObjects.toStringHelper((Object)this).add("FilePath", (Object)this.mFilePath).add("blockId", this.mBlockId).add("workerNetAddress", (Object)this.mWorkerNetAddress).toString();
        }
    }
}

