/*
 * Decompiled with CFR 0.152.
 */
package alluxio.job.plan.migrate;

import alluxio.AlluxioURI;
import alluxio.client.WriteType;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileOutStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.URIStatus;
import alluxio.collections.Pair;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.FileAlreadyExistsException;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.OpenFilePOptions;
import alluxio.grpc.ReadPType;
import alluxio.grpc.WritePType;
import alluxio.job.RunTaskContext;
import alluxio.job.SelectExecutorsContext;
import alluxio.job.plan.AbstractVoidPlanDefinition;
import alluxio.job.plan.migrate.MigrateCommand;
import alluxio.job.plan.migrate.MigrateConfig;
import alluxio.job.util.JobUtils;
import alluxio.job.util.SerializableVoid;
import alluxio.util.io.PathUtils;
import alluxio.wire.WorkerInfo;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class MigrateDefinition
extends AbstractVoidPlanDefinition<MigrateConfig, MigrateCommand> {
    private static final Logger LOG = LoggerFactory.getLogger(MigrateDefinition.class);
    private static final int JOBS_PER_WORKER = 10;
    private final Random mRandom = new Random();

    private void checkMigrateValid(MigrateConfig config, FileSystem fs) throws Exception {
        AlluxioURI source = new AlluxioURI(config.getSource());
        AlluxioURI destination = new AlluxioURI(config.getDestination());
        if (PathUtils.hasPrefix((String)destination.toString(), (String)source.toString())) {
            throw new RuntimeException(ExceptionMessage.MIGRATE_CANNOT_BE_TO_SUBDIRECTORY.getMessage(new Object[]{source, config.getDestination()}));
        }
    }

    @Override
    public Set<Pair<WorkerInfo, MigrateCommand>> selectExecutors(MigrateConfig config, List<WorkerInfo> jobWorkerInfoList, SelectExecutorsContext context) throws Exception {
        AlluxioURI destination;
        AlluxioURI source = new AlluxioURI(config.getSource());
        if (source.equals((Object)(destination = new AlluxioURI(config.getDestination())))) {
            return Sets.newHashSet();
        }
        this.checkMigrateValid(config, context.getFileSystem());
        Preconditions.checkState((!jobWorkerInfoList.isEmpty() ? 1 : 0) != 0, (Object)"No workers are available");
        URIStatus status = context.getFileSystem().getStatus(source);
        ConcurrentMap hostnameToWorker = Maps.newConcurrentMap();
        for (WorkerInfo workerInfo : jobWorkerInfoList) {
            hostnameToWorker.put(workerInfo.getAddress().getHost(), workerInfo);
        }
        List alluxioWorkerInfoList = context.getFsContext().getCachedWorkers();
        if (status.isFolder()) {
            throw new RuntimeException(ExceptionMessage.MIGRATE_DIRECTORY.getMessage(new Object[0]));
        }
        WorkerInfo bestJobWorker = this.getBestJobWorker(status, alluxioWorkerInfoList, jobWorkerInfoList, hostnameToWorker);
        HashSet result = Sets.newHashSet();
        result.add(new Pair((Object)bestJobWorker, (Object)new MigrateCommand(status.getPath(), destination.getPath())));
        return result;
    }

    private WorkerInfo getBestJobWorker(URIStatus status, List<BlockWorkerInfo> alluxioWorkerInfoList, List<WorkerInfo> jobWorkerInfoList, Map<String, WorkerInfo> hostnameToJobWorker) {
        WorkerInfo worker;
        BlockWorkerInfo bestWorker = JobUtils.getWorkerWithMostBlocks(alluxioWorkerInfoList, status.getFileBlockInfos());
        if (bestWorker == null) {
            bestWorker = alluxioWorkerInfoList.get(this.mRandom.nextInt(jobWorkerInfoList.size()));
        }
        if ((worker = hostnameToJobWorker.get(bestWorker.getNetAddress().getHost())) == null) {
            return jobWorkerInfoList.get(new Random().nextInt(jobWorkerInfoList.size()));
        }
        return worker;
    }

    @Override
    public SerializableVoid runTask(MigrateConfig config, MigrateCommand command, RunTaskContext context) throws Exception {
        WriteType writeType = config.getWriteType() == null ? (WriteType)ServerConfiguration.getEnum((PropertyKey)PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.class) : WriteType.valueOf((String)config.getWriteType());
        MigrateDefinition.migrate(command, writeType.toProto(), context.getFileSystem(), config.isOverwrite());
        return null;
    }

    private static void migrate(MigrateCommand command, WritePType writeType, FileSystem fileSystem, boolean overwrite) throws Exception {
        boolean retry;
        String source = command.getSource();
        String destination = command.getDestination();
        LOG.debug("Migrating {} to {}", (Object)source, (Object)destination);
        CreateFilePOptions createOptions = CreateFilePOptions.newBuilder().setWriteType(writeType).build();
        OpenFilePOptions openFileOptions = OpenFilePOptions.newBuilder().setReadType(ReadPType.NO_CACHE).build();
        AlluxioURI destinationURI = new AlluxioURI(destination);
        do {
            retry = false;
            try (FileInStream in = fileSystem.openFile(new AlluxioURI(source), openFileOptions);
                 FileOutStream out = fileSystem.createFile(destinationURI, createOptions);){
                try {
                    IOUtils.copyLarge((InputStream)in, (OutputStream)out, (byte[])new byte[0x800000]);
                }
                catch (Throwable t) {
                    try {
                        out.cancel();
                    }
                    catch (Throwable t2) {
                        t.addSuppressed(t2);
                    }
                    throw t;
                }
            }
            catch (FileAlreadyExistsException e) {
                if (overwrite) {
                    fileSystem.delete(destinationURI);
                    retry = true;
                    continue;
                }
                throw e;
            }
        } while (retry);
    }

    @Override
    public Class<MigrateConfig> getJobConfigClass() {
        return MigrateConfig.class;
    }
}

