/*
 * Decompiled with CFR 0.152.
 */
package alluxio.underfs;

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.retry.CountingRetry;
import alluxio.retry.RetryPolicy;
import alluxio.retry.RetryUtils;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.com.google.common.util.concurrent.Futures;
import alluxio.shaded.client.com.google.common.util.concurrent.ListenableFuture;
import alluxio.shaded.client.com.google.common.util.concurrent.ListeningExecutorService;
import alluxio.shaded.client.javax.annotation.Nullable;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.shaded.client.org.apache.commons.codec.binary.Base64;
import alluxio.underfs.ContentHashable;
import alluxio.util.CommonUtils;
import alluxio.util.io.PathUtils;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.security.DigestOutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
public abstract class ObjectLowLevelOutputStream
extends OutputStream
implements ContentHashable {
    protected static final Logger LOG = LoggerFactory.getLogger(ObjectLowLevelOutputStream.class);
    protected final List<String> mTmpDirs;
    protected static final long UPLOAD_THRESHOLD = 0x500000L;
    protected final String mBucketName;
    protected final String mKey;
    protected final Supplier<RetryPolicy> mRetryPolicy = () -> new CountingRetry(5);
    protected final byte[] mSingleCharWrite = new byte[1];
    @Nullable
    protected MessageDigest mHash;
    protected boolean mClosed = false;
    protected long mPartitionOffset;
    protected final long mPartitionSize;
    @Nullable
    protected File mFile;
    @Nullable
    protected OutputStream mLocalOutputStream;
    private final AtomicInteger mPartNumber;
    private final ListeningExecutorService mExecutor;
    private final List<ListenableFuture<?>> mFutures = new ArrayList();
    @Nullable
    private Long mUploadPartTimeoutMills;
    private boolean mMultiPartUploadInitialized = false;

    public ObjectLowLevelOutputStream(String bucketName, String key, ListeningExecutorService executor, long streamingUploadPartitionSize, AlluxioConfiguration ufsConf) {
        Preconditions.checkArgument(bucketName != null && !bucketName.isEmpty(), "Bucket name must not be null or empty.");
        this.mBucketName = bucketName;
        this.mTmpDirs = ufsConf.getList(PropertyKey.TMP_DIRS);
        Preconditions.checkArgument(!this.mTmpDirs.isEmpty(), "No temporary directories available");
        this.mExecutor = executor;
        this.mKey = key;
        this.initHash();
        this.mPartitionSize = Math.max(0x500000L, streamingUploadPartitionSize);
        this.mPartNumber = new AtomicInteger(1);
        if (ufsConf.isSet(PropertyKey.UNDERFS_OBJECT_STORE_STREAMING_UPLOAD_PART_TIMEOUT)) {
            this.mUploadPartTimeoutMills = ufsConf.getDuration(PropertyKey.UNDERFS_OBJECT_STORE_STREAMING_UPLOAD_PART_TIMEOUT).toMillis();
        }
    }

    @Override
    public void write(int b) throws IOException {
        this.mSingleCharWrite[0] = (byte)b;
        this.write(this.mSingleCharWrite);
    }

    @Override
    public void write(byte[] b) throws IOException {
        this.write(b, 0, b.length);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        if (b == null || len == 0) {
            return;
        }
        Preconditions.checkNotNull(b);
        Preconditions.checkArgument(off >= 0 && off <= b.length && len >= 0 && off + len <= b.length);
        if (this.mFile == null) {
            this.initNewFile();
        }
        if (this.mPartitionOffset + (long)len <= this.mPartitionSize) {
            this.mLocalOutputStream.write(b, off, len);
            this.mPartitionOffset += (long)len;
        } else {
            int firstLen = (int)(this.mPartitionSize - this.mPartitionOffset);
            this.mLocalOutputStream.write(b, off, firstLen);
            this.mPartitionOffset += (long)firstLen;
            this.uploadPart();
            this.write(b, off + firstLen, len - firstLen);
        }
    }

    @Override
    public void flush() throws IOException {
        if (!this.mMultiPartUploadInitialized) {
            return;
        }
        if (this.mLocalOutputStream != null) {
            this.mLocalOutputStream.flush();
        }
        if (this.mPartitionOffset > 0x500000L) {
            this.uploadPart();
        }
        this.waitForAllPartsUpload();
    }

    @Override
    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        this.mClosed = true;
        if (!this.mMultiPartUploadInitialized) {
            if (this.mFile == null) {
                LOG.debug("Streaming upload output stream closed without uploading any data.");
                RetryUtils.retry("put empty object for key" + this.mKey, () -> this.createEmptyObject(this.mKey), this.mRetryPolicy.get());
            } else {
                try {
                    this.mLocalOutputStream.close();
                    String md5 = this.mHash != null ? Base64.encodeBase64String(this.mHash.digest()) : null;
                    RetryUtils.retry("put object for key" + this.mKey, () -> this.putObject(this.mKey, this.mFile, md5), this.mRetryPolicy.get());
                }
                finally {
                    if (!this.mFile.delete()) {
                        LOG.error("Failed to delete temporary file @ {}", (Object)this.mFile.getPath());
                    }
                }
            }
            return;
        }
        try {
            if (this.mFile != null) {
                this.mLocalOutputStream.close();
                int partNumber = this.mPartNumber.getAndIncrement();
                this.uploadPart(this.mFile, partNumber, true);
            }
            this.waitForAllPartsUpload();
            RetryUtils.retry("complete multipart upload", this::completeMultiPartUploadInternal, this.mRetryPolicy.get());
        }
        catch (Exception e) {
            LOG.error("Failed to upload {}", (Object)this.mKey, (Object)e);
            throw new IOException(e);
        }
    }

    private void initNewFile() throws IOException {
        this.mFile = new File(PathUtils.concatPath((Object)CommonUtils.getTmpDir(this.mTmpDirs), (Object)UUID.randomUUID()));
        this.initHash();
        this.mLocalOutputStream = this.mHash != null ? new BufferedOutputStream(new DigestOutputStream(new FileOutputStream(this.mFile), this.mHash)) : new BufferedOutputStream(new FileOutputStream(this.mFile));
        this.mPartitionOffset = 0L;
        LOG.debug("Init new temp file @ {}", (Object)this.mFile.getPath());
    }

    private void initHash() {
        try {
            this.mHash = MessageDigest.getInstance("MD5");
        }
        catch (NoSuchAlgorithmException e) {
            LOG.warn("Algorithm not available for MD5 hash.", (Throwable)e);
            this.mHash = null;
        }
    }

    protected void uploadPart() throws IOException {
        if (this.mFile == null) {
            return;
        }
        if (!this.mMultiPartUploadInitialized) {
            RetryUtils.retry("init multipart upload", this::initMultiPartUploadInternal, this.mRetryPolicy.get());
            this.mMultiPartUploadInitialized = true;
        }
        this.mLocalOutputStream.close();
        int partNumber = this.mPartNumber.getAndIncrement();
        this.uploadPart(new File(this.mFile.getPath()), partNumber, false);
        this.mFile = null;
        this.mLocalOutputStream = null;
    }

    protected void uploadPart(File file, int partNumber, boolean lastPart) {
        String md5 = this.mHash != null ? Base64.encodeBase64String(this.mHash.digest()) : null;
        Callable<Object> callable = () -> {
            try {
                RetryUtils.retry("upload part for key " + this.mKey + " and part number " + partNumber, () -> this.uploadPartInternal(file, partNumber, lastPart, md5), this.mRetryPolicy.get());
                Object var5_5 = null;
                return var5_5;
            }
            finally {
                if (!file.delete()) {
                    LOG.error("Failed to delete temporary file @ {}", (Object)file.getPath());
                }
            }
        };
        Future futureTag = this.mExecutor.submit(callable);
        this.mFutures.add((ListenableFuture<?>)futureTag);
        LOG.debug("Submit upload part request. key={}, partNum={}, file={}, fileSize={}, lastPart={}.", new Object[]{this.mKey, partNumber, file.getPath(), file.length(), lastPart});
    }

    protected void abortMultiPartUpload() {
        try {
            RetryUtils.retry("abort multipart upload for key " + this.mKey, this::abortMultiPartUploadInternal, this.mRetryPolicy.get());
        }
        catch (IOException e) {
            LOG.warn("Unable to abort multipart upload for key '{}' and id '{}' to bucket {}. You may need to enable the periodical cleanup by setting property {}to be true.", new Object[]{this.mKey, this.mBucketName, PropertyKey.UNDERFS_CLEANUP_ENABLED.getName(), e});
        }
    }

    protected void waitForAllPartsUpload() throws IOException {
        try {
            for (ListenableFuture<?> future : this.mFutures) {
                if (this.mUploadPartTimeoutMills == null) {
                    future.get();
                    continue;
                }
                future.get(this.mUploadPartTimeoutMills, TimeUnit.MILLISECONDS);
            }
        }
        catch (ExecutionException e) {
            Futures.allAsList(this.mFutures).cancel(true);
            this.abortMultiPartUpload();
            throw new IOException("Part upload failed in multipart upload with to " + this.mKey, e);
        }
        catch (InterruptedException e) {
            LOG.warn("Interrupted object upload.", (Throwable)e);
            Futures.allAsList(this.mFutures).cancel(true);
            this.abortMultiPartUpload();
            Thread.currentThread().interrupt();
        }
        catch (TimeoutException e) {
            LOG.error("timeout when upload part");
            Futures.allAsList(this.mFutures).cancel(true);
            this.abortMultiPartUpload();
            throw new IOException("timeout when upload part " + this.mKey, e);
        }
        this.mFutures.clear();
    }

    @VisibleForTesting
    public int getPartNumber() {
        return this.mPartNumber.get();
    }

    protected abstract void uploadPartInternal(File var1, int var2, boolean var3, @Nullable String var4) throws IOException;

    protected abstract void initMultiPartUploadInternal() throws IOException;

    protected abstract void completeMultiPartUploadInternal() throws IOException;

    protected abstract void abortMultiPartUploadInternal() throws IOException;

    protected abstract void createEmptyObject(String var1) throws IOException;

    protected abstract void putObject(String var1, File var2, @Nullable String var3) throws IOException;
}

