package alluxio.job.plan.persist;

import alluxio.AlluxioURI;
import alluxio.client.block.BlockWorkerInfo;
import alluxio.client.file.FileInStream;
import alluxio.client.file.URIStatus;
import alluxio.collections.Pair;
import alluxio.conf.Configuration;
import alluxio.grpc.OpenFilePOptions;
import alluxio.grpc.ReadPType;
import alluxio.job.JobConfig;
import alluxio.job.RunTaskContext;
import alluxio.job.SelectExecutorsContext;
import alluxio.job.plan.AbstractVoidPlanDefinition;
import alluxio.job.util.JobUtils;
import alluxio.job.util.SerializableVoid;
import alluxio.metrics.MetricsSystem;
import alluxio.resource.CloseableResource;
import alluxio.security.authorization.Mode;
import alluxio.underfs.UfsManager;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.CreateOptions;
import alluxio.underfs.options.MkdirsOptions;
import alluxio.wire.WorkerInfo;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.Stack;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/job/plan/persist/PersistDefinition.class */
public final class PersistDefinition extends AbstractVoidPlanDefinition<PersistConfig, SerializableVoid> {
    private static final Logger LOG = LoggerFactory.getLogger(PersistDefinition.class);

    public Set<Pair<WorkerInfo, SerializableVoid>> selectExecutors(PersistConfig persistConfig, List<WorkerInfo> list, SelectExecutorsContext selectExecutorsContext) throws Exception {
        if (list.isEmpty()) {
            throw new RuntimeException("No worker is available");
        }
        BlockWorkerInfo workerWithMostBlocks = JobUtils.getWorkerWithMostBlocks(selectExecutorsContext.getFsContext().getCachedWorkers(), selectExecutorsContext.getFileSystem().getStatus(new AlluxioURI(persistConfig.getFilePath())).getFileBlockInfos());
        HashSet newHashSet = Sets.newHashSet();
        boolean z = false;
        if (workerWithMostBlocks != null) {
            Iterator<WorkerInfo> it = list.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                WorkerInfo next = it.next();
                if (next.getAddress().getHost().equals(workerWithMostBlocks.getNetAddress().getHost())) {
                    newHashSet.add(new Pair(next, (Object) null));
                    z = true;
                    break;
                }
            }
        }
        if (!z) {
            newHashSet.add(new Pair(list.get(new Random().nextInt(list.size())), (Object) null));
        }
        return newHashSet;
    }

    @Override // alluxio.job.plan.PlanDefinition
    public SerializableVoid runTask(PersistConfig persistConfig, SerializableVoid serializableVoid, RunTaskContext runTaskContext) throws Exception {
        AlluxioURI alluxioURI = new AlluxioURI(persistConfig.getFilePath());
        String ufsPath = persistConfig.getUfsPath();
        UfsManager.UfsClient ufsClient = runTaskContext.getUfsManager().get(persistConfig.getMountId());
        CloseableResource acquireUfsResource = ufsClient.acquireUfsResource();
        Throwable th = null;
        try {
            UnderFileSystem underFileSystem = (UnderFileSystem) acquireUfsResource.get();
            if (underFileSystem == null) {
                throw new IOException("Failed to create UFS instance for " + ufsPath);
            }
            if (underFileSystem.exists(ufsPath)) {
                if (!persistConfig.isOverwrite()) {
                    throw new IOException("File " + persistConfig.getFilePath() + " is already persisted in UFS, to overwrite the file, please set the overwrite flag in the config.");
                }
                LOG.info("File {} is already persisted in UFS. Removing it.", persistConfig.getFilePath());
                underFileSystem.deleteExistingFile(ufsPath);
            }
            URIStatus status = runTaskContext.getFileSystem().getStatus(alluxioURI);
            if (!status.isCompleted()) {
                throw new IOException("Cannot persist an incomplete Alluxio file: " + alluxioURI);
            }
            Closer create = Closer.create();
            Throwable th2 = null;
            try {
                OpenFilePOptions build = OpenFilePOptions.newBuilder().setReadType(ReadPType.NO_CACHE).setUpdateLastAccessTime(false).build();
                FileInStream register = create.register(runTaskContext.getFileSystem().openFile(alluxioURI, build));
                AlluxioURI alluxioURI2 = new AlluxioURI(ufsPath);
                Stack stack = new Stack();
                AlluxioURI parent = alluxioURI.getParent();
                for (AlluxioURI parent2 = alluxioURI2.getParent(); !underFileSystem.isDirectory(parent2.toString()) && parent != null; parent2 = parent2.getParent()) {
                    stack.push(new Pair(parent2.toString(), parent.toString()));
                    parent = parent.getParent();
                }
                while (!stack.empty()) {
                    Pair pair = (Pair) stack.pop();
                    String str = (String) pair.getFirst();
                    URIStatus status2 = runTaskContext.getFileSystem().getStatus(new AlluxioURI((String) pair.getSecond()));
                    if (underFileSystem.mkdirs(str, MkdirsOptions.defaults(Configuration.global()).setCreateParent(false).setOwner(status2.getOwner()).setGroup(status2.getGroup()).setMode(new Mode((short) status2.getMode())))) {
                        underFileSystem.setAclEntries(str, (List) Stream.concat(status2.getDefaultAcl().getEntries().stream(), status2.getAcl().getEntries().stream()).collect(Collectors.toList()));
                    } else if (!underFileSystem.isDirectory(str)) {
                        throw new IOException("Failed to create " + ufsPath + " with permission " + build + " because its ancestor " + str + " is not a directory");
                    }
                }
                OutputStream outputStream = (OutputStream) create.register(underFileSystem.createNonexistingFile(alluxioURI2.toString(), CreateOptions.defaults(Configuration.global()).setOwner(status.getOwner()).setGroup(status.getGroup()).setMode(new Mode((short) status.getMode()))));
                URIStatus status3 = runTaskContext.getFileSystem().getStatus(alluxioURI);
                underFileSystem.setAclEntries(alluxioURI2.toString(), (List) Stream.concat(status3.getDefaultAcl().getEntries().stream(), status3.getAcl().getEntries().stream()).collect(Collectors.toList()));
                long copyLarge = IOUtils.copyLarge(register, outputStream, new byte[8388608]);
                incrementPersistedMetric(ufsClient.getUfsMountPointUri(), copyLarge);
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        create.close();
                    }
                }
                LOG.info("Persisted file {} with size {}", ufsPath, Long.valueOf(copyLarge));
                if (acquireUfsResource == null) {
                    return null;
                }
                if (0 == 0) {
                    acquireUfsResource.close();
                    return null;
                }
                try {
                    acquireUfsResource.close();
                    return null;
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                    return null;
                }
            } catch (Throwable th5) {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th6) {
                            th2.addSuppressed(th6);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th5;
            }
        } catch (Throwable th7) {
            if (acquireUfsResource != null) {
                if (0 != 0) {
                    try {
                        acquireUfsResource.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    acquireUfsResource.close();
                }
            }
            throw th7;
        }
    }

    private void incrementPersistedMetric(AlluxioURI alluxioURI, long j) {
        MetricsSystem.counter(String.format("BytesPersisted-Ufs:%s", MetricsSystem.escape(alluxioURI))).inc(j);
    }

    @Override // alluxio.job.plan.PlanDefinition
    public Class<PersistConfig> getJobConfigClass() {
        return PersistConfig.class;
    }

    @Override // alluxio.job.plan.PlanDefinition
    public /* bridge */ /* synthetic */ Set selectExecutors(JobConfig jobConfig, List list, SelectExecutorsContext selectExecutorsContext) throws Exception {
        return selectExecutors((PersistConfig) jobConfig, (List<WorkerInfo>) list, selectExecutorsContext);
    }
}
