/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.blob;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.AccessCondition;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.DoesServiceRequest;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.OperationContext;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.StorageException;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.blob.BlobOutputStream;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.blob.BlobRequestOptions;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.blob.BlobType;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.blob.BlockEntry;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.blob.BlockSearchMode;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.blob.CloudAppendBlob;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.blob.CloudBlob;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.blob.CloudBlockBlob;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.blob.CloudPageBlob;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.core.Base64;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.core.Logger;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.core.Utility;

final class BlobOutputStreamInternal
extends BlobOutputStream {
    private AccessCondition accessCondition;
    private String blockIdPrefix;
    private ArrayList<BlockEntry> blockList;
    private final ExecutorCompletionService<Void> completionService;
    private final Set<Future<Void>> futureSet;
    private int internalWriteThreshold = -1;
    private volatile IOException lastError = null;
    private MessageDigest md5Digest;
    private final OperationContext opContext;
    private final BlobRequestOptions options;
    private long currentBlobOffset;
    private volatile ByteArrayOutputStream outBuffer;
    private final CloudBlob parentBlobRef;
    private BlobType streamType = BlobType.UNSPECIFIED;
    private final ThreadPoolExecutor threadExecutor;

    private BlobOutputStreamInternal(CloudBlob parentBlob, AccessCondition accessCondition, BlobRequestOptions options, OperationContext opContext) throws StorageException {
        this.accessCondition = accessCondition;
        this.parentBlobRef = parentBlob;
        this.parentBlobRef.assertCorrectBlobType();
        this.options = new BlobRequestOptions(options);
        this.outBuffer = new ByteArrayOutputStream();
        this.opContext = opContext;
        if (this.options.getConcurrentRequestCount() < 1) {
            throw new IllegalArgumentException("ConcurrentRequestCount");
        }
        this.futureSet = Collections.newSetFromMap(new ConcurrentHashMap(this.options.getConcurrentRequestCount() == null ? 1 : this.options.getConcurrentRequestCount() * 2));
        if (this.options.getStoreBlobContentMD5().booleanValue()) {
            try {
                this.md5Digest = MessageDigest.getInstance("MD5");
            }
            catch (NoSuchAlgorithmException e) {
                throw Utility.generateNewUnexpectedStorageException(e);
            }
        }
        this.threadExecutor = new ThreadPoolExecutor((int)this.options.getConcurrentRequestCount(), (int)this.options.getConcurrentRequestCount(), 10L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new BlobOutputStreamThreadFactory());
        this.completionService = new ExecutorCompletionService(this.threadExecutor);
    }

    protected BlobOutputStreamInternal(CloudBlockBlob parentBlob, AccessCondition accessCondition, BlobRequestOptions options, OperationContext opContext) throws StorageException {
        this((CloudBlob)parentBlob, accessCondition, options, opContext);
        this.blockList = new ArrayList();
        this.blockIdPrefix = UUID.randomUUID().toString() + "-";
        this.streamType = BlobType.BLOCK_BLOB;
        this.internalWriteThreshold = this.parentBlobRef.getStreamWriteSizeInBytes();
    }

    @DoesServiceRequest
    protected BlobOutputStreamInternal(CloudPageBlob parentBlob, long length, AccessCondition accessCondition, BlobRequestOptions options, OperationContext opContext) throws StorageException {
        this(parentBlob, accessCondition, options, opContext);
        this.streamType = BlobType.PAGE_BLOB;
        this.internalWriteThreshold = (int)Math.min((long)this.parentBlobRef.getStreamWriteSizeInBytes(), length);
    }

    @DoesServiceRequest
    protected BlobOutputStreamInternal(CloudAppendBlob parentBlob, AccessCondition accessCondition, BlobRequestOptions options, OperationContext opContext) throws StorageException {
        this((CloudBlob)parentBlob, accessCondition, options, opContext);
        this.streamType = BlobType.APPEND_BLOB;
        this.accessCondition = accessCondition != null ? accessCondition : new AccessCondition();
        this.currentBlobOffset = this.accessCondition.getIfAppendPositionEqual() != null ? this.accessCondition.getIfAppendPositionEqual().longValue() : parentBlob.getProperties().getLength();
        this.internalWriteThreshold = this.parentBlobRef.getStreamWriteSizeInBytes();
    }

    private void checkStreamState() throws IOException {
        if (this.lastError != null) {
            throw this.lastError;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    @DoesServiceRequest
    public synchronized void close() throws IOException {
        try {
            this.checkStreamState();
            this.flush();
            this.threadExecutor.shutdown();
            try {
                this.commit();
            }
            catch (StorageException e) {
                throw Utility.initIOException(e);
            }
        }
        finally {
            this.lastError = new IOException("Stream is already closed.");
            if (!this.threadExecutor.isShutdown()) {
                this.threadExecutor.shutdownNow();
            }
        }
    }

    @DoesServiceRequest
    private synchronized void commit() throws StorageException {
        if (this.options.getStoreBlobContentMD5().booleanValue()) {
            this.parentBlobRef.getProperties().setContentMD5(Base64.encode(this.md5Digest.digest()));
        }
        if (this.streamType == BlobType.BLOCK_BLOB) {
            CloudBlockBlob blobRef = (CloudBlockBlob)this.parentBlobRef;
            blobRef.commitBlockList(this.blockList, this.accessCondition, this.options, this.opContext);
        } else if (this.options.getStoreBlobContentMD5().booleanValue()) {
            this.parentBlobRef.uploadProperties(this.accessCondition, this.options, this.opContext);
        }
    }

    @DoesServiceRequest
    private synchronized void dispatchWrite() throws IOException {
        final int writeLength = this.outBuffer.size();
        if (writeLength == 0) {
            return;
        }
        if (this.streamType == BlobType.PAGE_BLOB && writeLength % 512 != 0) {
            throw new IOException(String.format("Page data must be a multiple of 512 bytes. Buffer currently contains %d bytes.", writeLength));
        }
        Callable<Void> worker = null;
        if (this.threadExecutor.getQueue().size() >= this.options.getConcurrentRequestCount() * 2) {
            this.waitForTaskToComplete();
        }
        if (this.futureSet.size() >= this.options.getConcurrentRequestCount() * 2) {
            this.clearCompletedFutures();
        }
        final ByteArrayInputStream bufferRef = new ByteArrayInputStream(this.outBuffer.toByteArray());
        if (this.streamType == BlobType.BLOCK_BLOB) {
            final String blockID = this.getCurrentBlockId();
            this.blockList.add(new BlockEntry(blockID, BlockSearchMode.LATEST));
            worker = new Callable<Void>(){

                @Override
                public Void call() {
                    BlobOutputStreamInternal.this.writeBlock(bufferRef, blockID, writeLength);
                    return null;
                }
            };
        } else if (this.streamType == BlobType.PAGE_BLOB) {
            final long opOffset = this.currentBlobOffset;
            this.currentBlobOffset += (long)writeLength;
            worker = new Callable<Void>(){

                @Override
                public Void call() {
                    BlobOutputStreamInternal.this.writePages(bufferRef, opOffset, writeLength);
                    return null;
                }
            };
        } else if (this.streamType == BlobType.APPEND_BLOB) {
            final long opOffset = this.currentBlobOffset;
            this.currentBlobOffset += (long)writeLength;
            if (this.accessCondition.getIfMaxSizeLessThanOrEqual() != null && this.currentBlobOffset > this.accessCondition.getIfMaxSizeLessThanOrEqual()) {
                this.lastError = new IOException("Append block data should not exceed the maximum blob size condition value.");
                throw this.lastError;
            }
            worker = new Callable<Void>(){

                @Override
                public Void call() {
                    BlobOutputStreamInternal.this.appendBlock(bufferRef, opOffset, writeLength);
                    return null;
                }
            };
        }
        this.futureSet.add(this.completionService.submit(worker));
        this.outBuffer = new ByteArrayOutputStream();
    }

    private void writeBlock(ByteArrayInputStream blockData, String blockId, long writeLength) {
        CloudBlockBlob blobRef = (CloudBlockBlob)this.parentBlobRef;
        try {
            blobRef.uploadBlock(blockId, blockData, writeLength, this.accessCondition, this.options, this.opContext);
        }
        catch (IOException e) {
            this.lastError = e;
        }
        catch (StorageException e) {
            this.lastError = Utility.initIOException(e);
        }
    }

    private void writePages(ByteArrayInputStream pageData, long offset, long writeLength) {
        CloudPageBlob blobRef = (CloudPageBlob)this.parentBlobRef;
        try {
            blobRef.uploadPages(pageData, offset, writeLength, this.accessCondition, this.options, this.opContext);
        }
        catch (IOException e) {
            this.lastError = e;
        }
        catch (StorageException e) {
            this.lastError = Utility.initIOException(e);
        }
    }

    private void appendBlock(ByteArrayInputStream blockData, long offset, long writeLength) {
        CloudAppendBlob blobRef = (CloudAppendBlob)this.parentBlobRef;
        this.accessCondition.setIfAppendPositionEqual(offset);
        int previousResultsCount = this.opContext.getRequestResults().size();
        try {
            blobRef.appendBlock(blockData, writeLength, this.accessCondition, this.options, this.opContext);
        }
        catch (IOException e) {
            this.lastError = e;
        }
        catch (StorageException e) {
            if (this.options.getAbsorbConditionalErrorsOnRetry().booleanValue() && e.getHttpStatusCode() == 412 && e.getExtendedErrorInformation() != null && e.getErrorCode() != null && (e.getErrorCode().equals("AppendPositionConditionNotMet") || e.getErrorCode().equals("MaxBlobSizeConditionNotMet")) && this.opContext.getRequestResults().size() - previousResultsCount > 1) {
                Logger.info(this.opContext, "Pre-condition failure on a retry is being ignored since the request should have succeeded in the first attempt.");
            }
            this.lastError = Utility.initIOException(e);
        }
    }

    @Override
    @DoesServiceRequest
    public void flush() throws IOException {
        this.checkStreamState();
        this.dispatchWrite();
        HashSet<Future<Void>> requests = new HashSet<Future<Void>>(this.futureSet);
        for (Future future : requests) {
            try {
                future.get();
            }
            catch (Exception e) {
                throw Utility.initIOException(e);
            }
            this.checkStreamState();
        }
    }

    private String getCurrentBlockId() throws IOException {
        byte[] blockIdInBytes;
        String blockIdSuffix = String.format("%06d", this.blockList.size());
        try {
            blockIdInBytes = (this.blockIdPrefix + blockIdSuffix).getBytes("UTF-8");
        }
        catch (UnsupportedEncodingException e) {
            throw new IOException(e);
        }
        return Base64.encode(blockIdInBytes);
    }

    private void waitForTaskToComplete() throws IOException {
        boolean completed = false;
        while (this.completionService.poll() != null) {
            completed = true;
        }
        if (!completed) {
            try {
                this.completionService.take();
            }
            catch (InterruptedException e) {
                throw Utility.initIOException(e);
            }
        }
    }

    private void clearCompletedFutures() {
        for (Future<Void> request : this.futureSet) {
            if (!request.isDone()) continue;
            this.futureSet.remove(request);
        }
    }

    @Override
    @DoesServiceRequest
    public void write(byte[] data) throws IOException {
        this.write(data, 0, data.length);
    }

    @Override
    @DoesServiceRequest
    public void write(byte[] data, int offset, int length) throws IOException {
        if (offset < 0 || length < 0 || length > data.length - offset) {
            throw new IndexOutOfBoundsException();
        }
        this.writeInternal(data, offset, length);
    }

    @Override
    @DoesServiceRequest
    public void write(InputStream sourceStream, long writeLength) throws IOException, StorageException {
        Utility.writeToOutputStream(sourceStream, this, writeLength, false, false, this.opContext, this.options, false);
    }

    @Override
    @DoesServiceRequest
    public void write(int byteVal) throws IOException {
        this.write(new byte[]{(byte)(byteVal & 0xFF)});
    }

    @DoesServiceRequest
    private synchronized void writeInternal(byte[] data, int offset, int length) throws IOException {
        while (length > 0) {
            this.checkStreamState();
            int availableBufferBytes = this.internalWriteThreshold - this.outBuffer.size();
            int nextWrite = Math.min(availableBufferBytes, length);
            if (this.options.getStoreBlobContentMD5().booleanValue()) {
                this.md5Digest.update(data, offset, nextWrite);
            }
            this.outBuffer.write(data, offset, nextWrite);
            offset += nextWrite;
            length -= nextWrite;
            if (this.outBuffer.size() != this.internalWriteThreshold) continue;
            this.dispatchWrite();
        }
    }

    private static class BlobOutputStreamThreadFactory
    implements ThreadFactory {
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        BlobOutputStreamThreadFactory() {
            SecurityManager s2 = System.getSecurityManager();
            this.group = s2 != null ? s2.getThreadGroup() : Thread.currentThread().getThreadGroup();
            this.namePrefix = "azure-storage-bloboutputstream-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(this.group, r, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);
            t.setDaemon(true);
            if (t.getPriority() != 5) {
                t.setPriority(5);
            }
            return t;
        }
    }
}

