/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.s3a.commit.staging;

import com.google.common.base.Preconditions;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Invoker;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
import org.apache.hadoop.fs.s3a.commit.CommitUtils;
import org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR;
import org.apache.hadoop.fs.s3a.commit.DurationInfo;
import org.apache.hadoop.fs.s3a.commit.Tasks;
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
import org.apache.hadoop.fs.s3a.commit.staging.ConflictResolution;
import org.apache.hadoop.fs.s3a.commit.staging.Paths;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StagingCommitter
extends AbstractS3ACommitter {
    private static final Logger LOG = LoggerFactory.getLogger(StagingCommitter.class);
    public static final String NAME = "staging";
    private final Path constructorOutputPath = Preconditions.checkNotNull(this.getOutputPath(), "output path");
    private final long uploadPartSize;
    private final String uuid;
    private final boolean uniqueFilenames;
    private final FileOutputCommitter wrappedCommitter;
    private ConflictResolution conflictResolution;
    private String s3KeyPrefix;
    private Path commitsDirectory;

    public StagingCommitter(Path outputPath, TaskAttemptContext context) throws IOException {
        super(outputPath, context);
        Configuration conf = this.getConf();
        this.uploadPartSize = conf.getLongBytes("fs.s3a.multipart.size", 0x6400000L);
        this.uuid = StagingCommitter.getUploadUUID(conf, context.getJobID());
        this.uniqueFilenames = conf.getBoolean("fs.s3a.committer.staging.unique-filenames", true);
        this.setWorkPath(StagingCommitter.buildWorkPath((JobContext)context, this.uuid));
        this.wrappedCommitter = this.createWrappedCommitter((JobContext)context, conf);
        this.setOutputPath(this.constructorOutputPath);
        Path finalOutputPath = this.getOutputPath();
        Preconditions.checkNotNull(finalOutputPath, "Output path cannot be null");
        S3AFileSystem fs = CommitUtils.getS3AFileSystem(finalOutputPath, context.getConfiguration(), false);
        this.s3KeyPrefix = fs.pathToKey(finalOutputPath);
        LOG.debug("{}: final output path is {}", (Object)this.getRole(), (Object)finalOutputPath);
        ConflictResolution mode = this.getConflictResolutionMode(this.getJobContext(), fs.getConf());
        LOG.debug("Conflict resolution mode: {}", (Object)mode);
    }

    @Override
    public String getName() {
        return NAME;
    }

    protected FileOutputCommitter createWrappedCommitter(JobContext context, Configuration conf) throws IOException {
        this.initFileOutputCommitterOptions(context);
        this.commitsDirectory = Paths.getMultipartUploadCommitsDirectory(conf, this.uuid);
        return new FileOutputCommitter(this.commitsDirectory, context);
    }

    protected void initFileOutputCommitterOptions(JobContext context) {
        context.getConfiguration().setInt("mapreduce.fileoutputcommitter.algorithm.version", 1);
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder("StagingCommitter{");
        sb.append(super.toString());
        sb.append(", conflictResolution=").append((Object)this.conflictResolution);
        if (this.wrappedCommitter != null) {
            sb.append(", wrappedCommitter=").append(this.wrappedCommitter);
        }
        sb.append('}');
        return sb.toString();
    }

    public static String getUploadUUID(Configuration conf, String jobId) {
        return conf.getTrimmed("fs.s3a.committer.staging.uuid", conf.getTrimmed("spark.sql.sources.writeJobUUID", conf.getTrimmed("spark.app.id", jobId)));
    }

    public static String getUploadUUID(Configuration conf, JobID jobId) {
        return StagingCommitter.getUploadUUID(conf, jobId.toString());
    }

    private static Path buildWorkPath(JobContext context, String uuid) throws IOException {
        if (context instanceof TaskAttemptContext) {
            return StagingCommitter.taskAttemptWorkingPath((TaskAttemptContext)context, uuid);
        }
        return null;
    }

    public Boolean useUniqueFilenames() {
        return this.uniqueFilenames;
    }

    public FileSystem getJobAttemptFileSystem(JobContext context) throws IOException {
        Path p = this.getJobAttemptPath(context);
        return p.getFileSystem(context.getConfiguration());
    }

    public static Path getJobAttemptPath(JobContext context, Path out) {
        return StagingCommitter.getJobAttemptPath(CommitUtilsWithMR.getAppAttemptId(context), out);
    }

    private static Path getJobAttemptPath(int appAttemptId, Path out) {
        return new Path(StagingCommitter.getPendingJobAttemptsPath(out), String.valueOf(appAttemptId));
    }

    @Override
    protected Path getJobAttemptPath(int appAttemptId) {
        return new Path(StagingCommitter.getPendingJobAttemptsPath(this.commitsDirectory), String.valueOf(appAttemptId));
    }

    private static Path getPendingTaskAttemptsPath(JobContext context, Path out) {
        return new Path(StagingCommitter.getJobAttemptPath(context, out), "_temporary");
    }

    public static Path getTaskAttemptPath(TaskAttemptContext context, Path out) {
        return new Path(StagingCommitter.getPendingTaskAttemptsPath((JobContext)context, out), String.valueOf(context.getTaskAttemptID()));
    }

    private static Path getPendingJobAttemptsPath(Path out) {
        Preconditions.checkNotNull(out, "Null 'out' path");
        return new Path(out, "_temporary");
    }

    public Path getCommittedTaskPath(TaskAttemptContext context) {
        return this.getCommittedTaskPath(CommitUtilsWithMR.getAppAttemptId((JobContext)context), context);
    }

    private static void validateContext(TaskAttemptContext context) {
        Preconditions.checkNotNull(context, "null context");
        Preconditions.checkNotNull(context.getTaskAttemptID(), "null task attempt ID");
        Preconditions.checkNotNull(context.getTaskAttemptID().getTaskID(), "null task ID");
        Preconditions.checkNotNull(context.getTaskAttemptID().getJobID(), "null job ID");
    }

    protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) {
        StagingCommitter.validateContext(context);
        return new Path(this.getJobAttemptPath(appAttemptId), String.valueOf(context.getTaskAttemptID().getTaskID()));
    }

    @Override
    public Path getTempTaskAttemptPath(TaskAttemptContext context) {
        throw new UnsupportedOperationException("Unimplemented");
    }

    protected List<LocatedFileStatus> getTaskOutput(TaskAttemptContext context) throws IOException {
        Path attemptPath = this.getTaskAttemptPath(context);
        Preconditions.checkNotNull(attemptPath, "No attemptPath path in {}", new Object[]{this});
        LOG.debug("Scanning {} for files to commit", (Object)attemptPath);
        return S3AUtils.listAndFilter(this.getTaskAttemptFilesystem(context), attemptPath, true, S3AUtils.HIDDEN_FILE_FILTER);
    }

    protected String getFinalKey(String relative, JobContext context) {
        if (this.uniqueFilenames) {
            return this.getS3KeyPrefix(context) + "/" + Paths.addUUID(relative, this.uuid);
        }
        return this.getS3KeyPrefix(context) + "/" + relative;
    }

    protected final Path getFinalPath(String relative, JobContext context) throws IOException {
        return this.getDestS3AFS().keyToQualifiedPath(this.getFinalKey(relative, context));
    }

    @Override
    public Path getBaseTaskAttemptPath(TaskAttemptContext context) {
        return this.getWorkPath();
    }

    @Override
    public Path getJobAttemptPath(JobContext context) {
        return this.wrappedCommitter.getJobAttemptPath(context);
    }

    public void setupJob(JobContext context) throws IOException {
        LOG.debug("{}, Setting up job {}", (Object)this.getRole(), (Object)CommitUtilsWithMR.jobIdString(context));
        context.getConfiguration().set("fs.s3a.committer.staging.uuid", this.uuid);
        this.wrappedCommitter.setupJob(context);
    }

    @Override
    protected List<SinglePendingCommit> listPendingUploadsToCommit(JobContext context) throws IOException {
        return this.listPendingUploads(context, false);
    }

    protected List<SinglePendingCommit> listPendingUploadsToAbort(JobContext context) throws IOException {
        return this.listPendingUploads(context, true);
    }

    protected List<SinglePendingCommit> listPendingUploads(JobContext context, boolean suppressExceptions) throws IOException {
        try {
            Path wrappedJobAttemptPath = this.wrappedCommitter.getJobAttemptPath(context);
            FileSystem attemptFS = wrappedJobAttemptPath.getFileSystem(context.getConfiguration());
            return this.loadPendingsetFiles(context, suppressExceptions, attemptFS, S3AUtils.listAndFilter(attemptFS, wrappedJobAttemptPath, false, S3AUtils.HIDDEN_FILE_FILTER));
        }
        catch (IOException e) {
            this.maybeIgnore(suppressExceptions, "Listing pending uploads", e);
            return new ArrayList<SinglePendingCommit>(0);
        }
    }

    @Override
    public void cleanupStagingDirs() {
        Path workPath = this.getWorkPath();
        if (workPath != null) {
            LOG.debug("Cleaning up work path {}", (Object)workPath);
            Invoker.ignoreIOExceptions(LOG, "cleaning up", workPath.toString(), () -> S3AUtils.deleteQuietly(workPath.getFileSystem(this.getConf()), workPath, true));
        }
    }

    @Override
    protected void cleanup(JobContext context, boolean suppressExceptions) throws IOException {
        this.maybeIgnore(suppressExceptions, "Cleanup wrapped committer", () -> this.wrappedCommitter.cleanupJob(context));
        this.maybeIgnore(suppressExceptions, "Delete destination paths", () -> this.deleteDestinationPaths(context));
        super.cleanup(context, suppressExceptions);
    }

    @Override
    protected void abortPendingUploadsInCleanup(boolean suppressExceptions) throws IOException {
        if (this.getConf().getBoolean("fs.s3a.committer.staging.abort.pending.uploads", true)) {
            super.abortPendingUploadsInCleanup(suppressExceptions);
        } else {
            LOG.info("Not cleanup up pending uploads to {} as {} is false ", (Object)this.getOutputPath(), (Object)"fs.s3a.committer.staging.abort.pending.uploads");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     */
    @Override
    protected void abortJobInternal(JobContext context, boolean suppressExceptions) throws IOException {
        String r = this.getRole();
        boolean failed = false;
        try {
            try (DurationInfo d = new DurationInfo(LOG, "%s: aborting job in state %s ", r, CommitUtilsWithMR.jobIdString(context));){
                List<SinglePendingCommit> pending = this.listPendingUploadsToAbort(context);
                this.abortPendingUploads(context, pending, suppressExceptions);
            }
            super.abortJobInternal(context, failed || suppressExceptions);
        }
        catch (FileNotFoundException e) {
            LOG.debug("No job directory to read uploads from");
            super.abortJobInternal(context, failed || suppressExceptions);
        }
        catch (IOException e2) {
            failed = true;
            this.maybeIgnore(suppressExceptions, "aborting job", e2);
            super.abortJobInternal(context, failed || suppressExceptions);
            {
                catch (Throwable throwable) {
                    super.abortJobInternal(context, failed || suppressExceptions);
                    throw throwable;
                }
            }
        }
    }

    protected void deleteDestinationPaths(JobContext context) throws IOException {
        Path attemptPath = this.getJobAttemptPath(context);
        Invoker.ignoreIOExceptions(LOG, "Deleting Job attempt Path", attemptPath.toString(), () -> S3AUtils.deleteWithWarning(this.getJobAttemptFileSystem(context), attemptPath, true));
        S3AUtils.deleteWithWarning(this.getDestFS(), new Path(this.getOutputPath(), "_temporary"), true);
        this.deleteTaskWorkingPathQuietly(context);
    }

    @Override
    public void setupTask(TaskAttemptContext context) throws IOException {
        Path taskAttemptPath = this.getTaskAttemptPath(context);
        try (DurationInfo d = new DurationInfo(LOG, "%s: setup task attempt path %s ", this.getRole(), taskAttemptPath);){
            taskAttemptPath.getFileSystem(this.getConf()).mkdirs(taskAttemptPath);
            this.wrappedCommitter.setupTask(context);
        }
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
        try (DurationInfo d = new DurationInfo(LOG, "%s: needsTaskCommit() Task %s", this.getRole(), context.getTaskAttemptID());){
            Path attemptPath = this.getTaskAttemptPath(context);
            FileSystem fs = this.getTaskAttemptFilesystem(context);
            FileStatus[] stats = fs.listStatus(attemptPath);
            LOG.debug("{} files to commit under {}", (Object)stats.length, (Object)attemptPath);
            boolean bl = stats.length > 0;
            return bl;
        }
        catch (FileNotFoundException e) {
            LOG.info("No files to commit");
            throw e;
        }
    }

    public void commitTask(TaskAttemptContext context) throws IOException {
        try (DurationInfo d = new DurationInfo(LOG, "%s: commit task %s", this.getRole(), context.getTaskAttemptID());){
            int count = this.commitTaskInternal(context, this.getTaskOutput(context));
            LOG.info("{}: upload file count: {}", (Object)this.getRole(), (Object)count);
        }
        catch (IOException e) {
            LOG.error("{}: commit of task {} failed", new Object[]{this.getRole(), context.getTaskAttemptID(), e});
            this.getCommitOperations().taskCompleted(false);
            throw e;
        }
        this.getCommitOperations().taskCompleted(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected int commitTaskInternal(TaskAttemptContext context, List<? extends FileStatus> taskOutput) throws IOException {
        LOG.debug("{}: commitTaskInternal", (Object)this.getRole());
        Configuration conf = context.getConfiguration();
        Path attemptPath = this.getTaskAttemptPath(context);
        FileSystem attemptFS = this.getTaskAttemptFilesystem(context);
        LOG.debug("{}: attempt path is {}", (Object)this.getRole(), (Object)attemptPath);
        Path commitsAttemptPath = this.wrappedCommitter.getTaskAttemptPath(context);
        FileSystem commitsFS = commitsAttemptPath.getFileSystem(conf);
        int commitCount = taskOutput.size();
        ConcurrentLinkedQueue commits = new ConcurrentLinkedQueue();
        LOG.info("{}: uploading from staging directory to S3", (Object)this.getRole());
        LOG.info("{}: Saving pending data information to {}", (Object)this.getRole(), (Object)commitsAttemptPath);
        if (taskOutput.isEmpty()) {
            LOG.warn("{}: No files to commit", (Object)this.getRole());
        } else {
            boolean threw = true;
            context.progress();
            PendingSet pendingCommits = new PendingSet(commitCount);
            try {
                Tasks.foreach(taskOutput).stopOnFailure().executeWith(this.buildThreadPool((JobContext)context)).run(stat -> {
                    Path path = stat.getPath();
                    File localFile = new File(path.toUri().getPath());
                    String relative = Paths.getRelativePath(attemptPath, path);
                    String partition = Paths.getPartition(relative);
                    String key = this.getFinalKey(relative, (JobContext)context);
                    Path destPath = this.getDestS3AFS().keyToQualifiedPath(key);
                    SinglePendingCommit commit = this.getCommitOperations().uploadFileToPendingCommit(localFile, destPath, partition, this.uploadPartSize);
                    LOG.debug("{}: adding pending commit {}", (Object)this.getRole(), (Object)commit);
                    commits.add(commit);
                });
                for (SinglePendingCommit commit2 : commits) {
                    pendingCommits.add(commit2);
                }
                LOG.debug("Saving {} pending commit(s)) to file {}", (Object)pendingCommits.size(), (Object)commitsAttemptPath);
                pendingCommits.save(commitsFS, commitsAttemptPath, false);
                threw = false;
            }
            finally {
                if (threw) {
                    LOG.error("{}: Exception during commit process, aborting {} commit(s)", (Object)this.getRole(), (Object)commits.size());
                    Tasks.foreach(commits).suppressExceptions().run(commit -> this.getCommitOperations().abortSingleCommit((SinglePendingCommit)commit));
                    this.deleteTaskAttemptPathQuietly(context);
                }
            }
            Paths.clearTempFolderInfo(context.getTaskAttemptID());
        }
        LOG.debug("Committing wrapped task");
        this.wrappedCommitter.commitTask(context);
        LOG.debug("Cleaning up attempt dir {}", (Object)attemptPath);
        attemptFS.delete(attemptPath, true);
        return commits.size();
    }

    public void abortTask(TaskAttemptContext context) throws IOException {
        try (DurationInfo d = new DurationInfo(LOG, "Abort task %s", context.getTaskAttemptID());){
            this.deleteTaskAttemptPathQuietly(context);
            this.deleteTaskWorkingPathQuietly((JobContext)context);
            this.wrappedCommitter.abortTask(context);
        }
        catch (IOException e) {
            LOG.error("{}: exception when aborting task {}", new Object[]{this.getRole(), context.getTaskAttemptID(), e});
            throw e;
        }
    }

    private static Path taskAttemptWorkingPath(TaskAttemptContext context, String uuid) throws IOException {
        return StagingCommitter.getTaskAttemptPath(context, Paths.getLocalTaskAttemptTempDir(context.getConfiguration(), uuid, context.getTaskAttemptID()));
    }

    protected void deleteTaskWorkingPathQuietly(JobContext context) {
        Invoker.ignoreIOExceptions(LOG, "Delete working path", "", () -> {
            Path path = StagingCommitter.buildWorkPath(context, this.getUUID());
            if (path != null) {
                S3AUtils.deleteQuietly(path.getFileSystem(this.getConf()), path, true);
            }
        });
    }

    private String getS3KeyPrefix(JobContext context) {
        return this.s3KeyPrefix;
    }

    protected String getUUID() {
        return this.uuid;
    }

    public final ConflictResolution getConflictResolutionMode(JobContext context, Configuration fsConf) {
        if (this.conflictResolution == null) {
            this.conflictResolution = ConflictResolution.valueOf(StagingCommitter.getConfictModeOption(context, fsConf));
        }
        return this.conflictResolution;
    }

    public static String getConfictModeOption(JobContext context, Configuration fsConf) {
        return CommitUtilsWithMR.getConfigurationOption(context, fsConf, "fs.s3a.committer.staging.conflict-mode", "fail").toUpperCase(Locale.ENGLISH);
    }
}

