/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.fs.s3a;

import com.amazonaws.event.ProgressEvent;
import com.amazonaws.event.ProgressEventType;
import com.amazonaws.event.ProgressListener;
import com.amazonaws.services.s3.model.PartETag;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.s3a.S3ADataBlocks;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.WriteOperationHelper;
import org.apache.hadoop.fs.s3a.commit.PutTracker;
import org.apache.hadoop.util.Progressable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
class S3ABlockOutputStream
extends OutputStream
implements StreamCapabilities {
    private static final Logger LOG = LoggerFactory.getLogger(S3ABlockOutputStream.class);
    private final S3AFileSystem fs;
    private final String key;
    private final int blockSize;
    private long bytesSubmitted;
    private final ProgressListener progressListener;
    private final ListeningExecutorService executorService;
    private final S3ADataBlocks.BlockFactory blockFactory;
    private final byte[] singleCharWrite = new byte[1];
    private MultiPartUpload multiPartUpload;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private S3ADataBlocks.DataBlock activeBlock;
    private long blockCount = 0L;
    private final S3AInstrumentation.OutputStreamStatistics statistics;
    private final WriteOperationHelper writeOperationHelper;
    private final PutTracker putTracker;

    S3ABlockOutputStream(S3AFileSystem fs, String key, ExecutorService executorService, Progressable progress, long blockSize, S3ADataBlocks.BlockFactory blockFactory, S3AInstrumentation.OutputStreamStatistics statistics, WriteOperationHelper writeOperationHelper, PutTracker putTracker) throws IOException {
        this.fs = fs;
        this.key = key;
        this.blockFactory = blockFactory;
        this.blockSize = (int)blockSize;
        this.statistics = statistics;
        this.writeOperationHelper = writeOperationHelper;
        this.putTracker = putTracker;
        Preconditions.checkArgument(blockSize >= 0x500000L, "Block size is too small: %d", new Object[]{blockSize});
        this.executorService = MoreExecutors.listeningDecorator(executorService);
        this.multiPartUpload = null;
        this.progressListener = progress instanceof ProgressListener ? (ProgressListener)((Object)progress) : new ProgressableListener(progress);
        this.createBlockIfNeeded();
        LOG.debug("Initialized S3ABlockOutputStream for {} output to {}", (Object)key, (Object)this.activeBlock);
        if (putTracker.initialize()) {
            LOG.debug("Put tracker requests multipart upload");
            this.initMultipartUpload();
        }
    }

    private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded() throws IOException {
        if (this.activeBlock == null) {
            ++this.blockCount;
            if (this.blockCount >= 10000L) {
                LOG.error("Number of partitions in stream exceeds limit for S3: 10000 write may fail.");
            }
            this.activeBlock = this.blockFactory.create(this.blockCount, this.blockSize, this.statistics);
        }
        return this.activeBlock;
    }

    private synchronized S3ADataBlocks.DataBlock getActiveBlock() {
        return this.activeBlock;
    }

    private synchronized boolean hasActiveBlock() {
        return this.activeBlock != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void clearActiveBlock() {
        if (this.activeBlock != null) {
            LOG.debug("Clearing active block");
        }
        S3ABlockOutputStream s3ABlockOutputStream = this;
        synchronized (s3ABlockOutputStream) {
            this.activeBlock = null;
        }
    }

    void checkOpen() throws IOException {
        if (this.closed.get()) {
            throw new IOException("Filesystem " + this.writeOperationHelper + " closed");
        }
    }

    @Override
    public synchronized void flush() throws IOException {
        this.checkOpen();
        S3ADataBlocks.DataBlock dataBlock = this.getActiveBlock();
        if (dataBlock != null) {
            dataBlock.flush();
        }
    }

    @Override
    public synchronized void write(int b) throws IOException {
        this.singleCharWrite[0] = (byte)b;
        this.write(this.singleCharWrite, 0, 1);
    }

    @Override
    public synchronized void write(byte[] source, int offset, int len) throws IOException {
        S3ADataBlocks.validateWriteArgs(source, offset, len);
        this.checkOpen();
        if (len == 0) {
            return;
        }
        S3ADataBlocks.DataBlock block = this.createBlockIfNeeded();
        int written = block.write(source, offset, len);
        int remainingCapacity = block.remainingCapacity();
        if (written < len) {
            LOG.debug("writing more data than block has capacity -triggering upload");
            this.uploadCurrentBlock();
            this.write(source, offset + written, len - written);
        } else if (remainingCapacity == 0) {
            this.uploadCurrentBlock();
        }
    }

    private synchronized void uploadCurrentBlock() throws IOException {
        Preconditions.checkState(this.hasActiveBlock(), "No active block");
        LOG.debug("Writing block # {}", (Object)this.blockCount);
        this.initMultipartUpload();
        try {
            this.multiPartUpload.uploadBlockAsync(this.getActiveBlock());
            this.bytesSubmitted += (long)this.getActiveBlock().dataSize();
        }
        finally {
            this.clearActiveBlock();
        }
    }

    private void initMultipartUpload() throws IOException {
        if (this.multiPartUpload == null) {
            LOG.debug("Initiating Multipart upload");
            this.multiPartUpload = new MultiPartUpload(this.key);
        }
    }

    @Override
    public void close() throws IOException {
        if (this.closed.getAndSet(true)) {
            LOG.debug("Ignoring close() as stream is already closed");
            return;
        }
        S3ADataBlocks.DataBlock block = this.getActiveBlock();
        boolean hasBlock = this.hasActiveBlock();
        LOG.debug("{}: Closing block #{}: current block= {}", new Object[]{this, this.blockCount, hasBlock ? block : "(none)"});
        long bytes = 0L;
        try {
            if (this.multiPartUpload == null) {
                if (hasBlock) {
                    this.bytesSubmitted = bytes = (long)this.putObject();
                }
            } else {
                if (hasBlock && (block.hasData() || this.multiPartUpload.getPartsSubmitted() == 0)) {
                    this.uploadCurrentBlock();
                }
                List partETags = this.multiPartUpload.waitForAllPartUploads();
                bytes = this.bytesSubmitted;
                if (this.putTracker.aboutToComplete(this.multiPartUpload.getUploadId(), partETags, bytes)) {
                    this.multiPartUpload.complete(partETags);
                } else {
                    LOG.info("File {} will be visible when the job is committed", (Object)this.key);
                }
            }
            if (!this.putTracker.outputImmediatelyVisible()) {
                this.statistics.commitUploaded(bytes);
            }
            LOG.debug("Upload complete to {} by {}", (Object)this.key, (Object)this.writeOperationHelper);
        }
        catch (IOException ioe) {
            try {
                this.writeOperationHelper.writeFailed(ioe);
                throw ioe;
            }
            catch (Throwable throwable) {
                S3AUtils.closeAll(LOG, block, this.blockFactory);
                LOG.debug("Statistics: {}", (Object)this.statistics);
                S3AUtils.closeAll(LOG, this.statistics);
                this.clearActiveBlock();
                throw throwable;
            }
        }
        S3AUtils.closeAll(LOG, block, this.blockFactory);
        LOG.debug("Statistics: {}", (Object)this.statistics);
        S3AUtils.closeAll(LOG, this.statistics);
        this.clearActiveBlock();
        this.writeOperationHelper.writeSuccessful(bytes);
    }

    private int putObject() throws IOException {
        LOG.debug("Executing regular upload for {}", (Object)this.writeOperationHelper);
        S3ADataBlocks.DataBlock block = this.getActiveBlock();
        int size = block.dataSize();
        S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
        PutObjectRequest putObjectRequest = uploadData.hasFile() ? this.writeOperationHelper.createPutObjectRequest(this.key, uploadData.getFile()) : this.writeOperationHelper.createPutObjectRequest(this.key, uploadData.getUploadStream(), size);
        long transferQueueTime = this.now();
        BlockUploadProgress callback = new BlockUploadProgress(block, this.progressListener, transferQueueTime);
        putObjectRequest.setGeneralProgressListener(callback);
        this.statistics.blockUploadQueued(size);
        Future putObjectResult = this.executorService.submit(() -> {
            PutObjectResult putObjectResult;
            try {
                putObjectResult = this.writeOperationHelper.putObject(putObjectRequest);
            }
            catch (Throwable throwable) {
                S3AUtils.closeAll(LOG, uploadData, block);
                throw throwable;
            }
            S3AUtils.closeAll(LOG, uploadData, block);
            return putObjectResult;
        });
        this.clearActiveBlock();
        try {
            putObjectResult.get();
            return size;
        }
        catch (InterruptedException ie) {
            LOG.warn("Interrupted object upload", (Throwable)ie);
            Thread.currentThread().interrupt();
            return 0;
        }
        catch (ExecutionException ee) {
            throw S3AUtils.extractException("regular upload", this.key, ee);
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("S3ABlockOutputStream{");
        sb.append(this.writeOperationHelper.toString());
        sb.append(", blockSize=").append(this.blockSize);
        S3ADataBlocks.DataBlock block = this.activeBlock;
        if (block != null) {
            sb.append(", activeBlock=").append(block);
        }
        sb.append('}');
        return sb.toString();
    }

    private void incrementWriteOperations() {
        this.fs.incrementWriteOperations();
    }

    private long now() {
        return System.currentTimeMillis();
    }

    S3AInstrumentation.OutputStreamStatistics getStatistics() {
        return this.statistics;
    }

    @Override
    public boolean hasCapability(String capability) {
        switch (capability.toLowerCase(Locale.ENGLISH)) {
            case "s3a:magic.output.stream": {
                return !this.putTracker.outputImmediatelyVisible();
            }
            case "hflush": 
            case "hsync": {
                return false;
            }
        }
        return false;
    }

    private static class ProgressableListener
    implements ProgressListener {
        private final Progressable progress;

        ProgressableListener(Progressable progress) {
            this.progress = progress;
        }

        @Override
        public void progressChanged(ProgressEvent progressEvent) {
            if (this.progress != null) {
                this.progress.progress();
            }
        }
    }

    private final class BlockUploadProgress
    implements ProgressListener {
        private final S3ADataBlocks.DataBlock block;
        private final ProgressListener nextListener;
        private final long transferQueueTime;
        private long transferStartTime;

        private BlockUploadProgress(S3ADataBlocks.DataBlock block, ProgressListener nextListener, long transferQueueTime) {
            this.block = block;
            this.transferQueueTime = transferQueueTime;
            this.nextListener = nextListener;
        }

        @Override
        public void progressChanged(ProgressEvent progressEvent) {
            ProgressEventType eventType = progressEvent.getEventType();
            long bytesTransferred = progressEvent.getBytesTransferred();
            int size = this.block.dataSize();
            switch (eventType) {
                case REQUEST_BYTE_TRANSFER_EVENT: {
                    S3ABlockOutputStream.this.statistics.bytesTransferred(bytesTransferred);
                    break;
                }
                case TRANSFER_PART_STARTED_EVENT: {
                    this.transferStartTime = S3ABlockOutputStream.this.now();
                    S3ABlockOutputStream.this.statistics.blockUploadStarted(this.transferStartTime - this.transferQueueTime, size);
                    S3ABlockOutputStream.this.incrementWriteOperations();
                    break;
                }
                case TRANSFER_PART_COMPLETED_EVENT: {
                    S3ABlockOutputStream.this.statistics.blockUploadCompleted(S3ABlockOutputStream.this.now() - this.transferStartTime, size);
                    break;
                }
                case TRANSFER_PART_FAILED_EVENT: {
                    S3ABlockOutputStream.this.statistics.blockUploadFailed(S3ABlockOutputStream.this.now() - this.transferStartTime, size);
                    LOG.warn("Transfer failure of block {}", (Object)this.block);
                    break;
                }
            }
            if (this.nextListener != null) {
                this.nextListener.progressChanged(progressEvent);
            }
        }
    }

    private class MultiPartUpload {
        private final String uploadId;
        private final List<ListenableFuture<PartETag>> partETagsFutures;
        private int partsSubmitted;
        private int partsUploaded;
        private long bytesSubmitted;

        MultiPartUpload(String key) throws IOException {
            this.uploadId = S3ABlockOutputStream.this.writeOperationHelper.initiateMultiPartUpload(key);
            this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>(2);
            LOG.debug("Initiated multi-part upload for {} with id '{}'", (Object)S3ABlockOutputStream.this.writeOperationHelper, (Object)this.uploadId);
        }

        public int getPartsSubmitted() {
            return this.partsSubmitted;
        }

        public int getPartsUploaded() {
            return this.partsUploaded;
        }

        public String getUploadId() {
            return this.uploadId;
        }

        public long getBytesSubmitted() {
            return this.bytesSubmitted;
        }

        private void uploadBlockAsync(S3ADataBlocks.DataBlock block) throws IOException {
            LOG.debug("Queueing upload of {} for upload {}", (Object)block, (Object)this.uploadId);
            Preconditions.checkNotNull(this.uploadId, "Null uploadId");
            ++this.partsSubmitted;
            int size = block.dataSize();
            this.bytesSubmitted += (long)size;
            S3ADataBlocks.BlockUploadData uploadData = block.startUpload();
            int currentPartNumber = this.partETagsFutures.size() + 1;
            UploadPartRequest request = S3ABlockOutputStream.this.writeOperationHelper.newUploadPartRequest(S3ABlockOutputStream.this.key, this.uploadId, currentPartNumber, size, uploadData.getUploadStream(), uploadData.getFile(), 0L);
            long transferQueueTime = S3ABlockOutputStream.this.now();
            BlockUploadProgress callback = new BlockUploadProgress(block, S3ABlockOutputStream.this.progressListener, transferQueueTime);
            request.setGeneralProgressListener(callback);
            S3ABlockOutputStream.this.statistics.blockUploadQueued(block.dataSize());
            Future partETagFuture = S3ABlockOutputStream.this.executorService.submit(() -> {
                PartETag partETag;
                try {
                    LOG.debug("Uploading part {} for id '{}'", (Object)currentPartNumber, (Object)this.uploadId);
                    PartETag partETag2 = S3ABlockOutputStream.this.writeOperationHelper.uploadPart(request).getPartETag();
                    LOG.debug("Completed upload of {} to part {}", (Object)block, (Object)partETag2.getETag());
                    LOG.debug("Stream statistics of {}", (Object)S3ABlockOutputStream.this.statistics);
                    ++this.partsUploaded;
                    partETag = partETag2;
                }
                catch (Throwable throwable) {
                    S3AUtils.closeAll(LOG, uploadData, block);
                    throw throwable;
                }
                S3AUtils.closeAll(LOG, uploadData, block);
                return partETag;
            });
            this.partETagsFutures.add((ListenableFuture<PartETag>)partETagFuture);
        }

        private List<PartETag> waitForAllPartUploads() throws IOException {
            LOG.debug("Waiting for {} uploads to complete", (Object)this.partETagsFutures.size());
            try {
                return (List)Futures.allAsList(this.partETagsFutures).get();
            }
            catch (InterruptedException ie) {
                LOG.warn("Interrupted partUpload", (Throwable)ie);
                Thread.currentThread().interrupt();
                return null;
            }
            catch (ExecutionException ee) {
                LOG.debug("While waiting for upload completion", (Throwable)ee);
                LOG.debug("Cancelling futures");
                for (ListenableFuture<PartETag> future : this.partETagsFutures) {
                    future.cancel(true);
                }
                this.abort();
                throw S3AUtils.extractException("Multi-part upload with id '" + this.uploadId + "' to " + S3ABlockOutputStream.this.key, S3ABlockOutputStream.this.key, ee);
            }
        }

        private void complete(List<PartETag> partETags) throws IOException {
            AtomicInteger errorCount = new AtomicInteger(0);
            try {
                S3ABlockOutputStream.this.writeOperationHelper.completeMPUwithRetries(S3ABlockOutputStream.this.key, this.uploadId, partETags, this.bytesSubmitted, errorCount);
            }
            finally {
                S3ABlockOutputStream.this.statistics.exceptionInMultipartComplete(errorCount.get());
            }
        }

        public void abort() {
            boolean retryCount = false;
            S3ABlockOutputStream.this.fs.incrementStatistic(Statistic.OBJECT_MULTIPART_UPLOAD_ABORTED);
            try {
                S3ABlockOutputStream.this.writeOperationHelper.abortMultipartUpload(S3ABlockOutputStream.this.key, this.uploadId, (text, e, r, i) -> S3ABlockOutputStream.this.statistics.exceptionInMultipartAbort());
            }
            catch (IOException e2) {
                LOG.warn("Unable to abort multipart upload, you may need to purge uploaded parts", (Throwable)e2);
            }
        }
    }
}

