/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.fs.azure.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.OperationContext;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.StorageException;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.blob.BlobRequestOptions;
import org.apache.flink.fs.azure.shaded.com.microsoft.azure.storage.blob.CloudPageBlob;
import org.apache.flink.fs.shaded.hadoop3.org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.flink.fs.shaded.hadoop3.org.apache.commons.logging.Log;
import org.apache.flink.fs.shaded.hadoop3.org.apache.commons.logging.LogFactory;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.conf.Configuration;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.Syncable;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.AzureNativeFileSystemStore;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.NativeAzureFileSystemHelper;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.PageBlobFormatHelpers;
import org.apache.flink.fs.shaded.hadoop3.org.apache.hadoop.fs.azure.StorageInterface;

final class PageBlobOutputStream
extends OutputStream
implements Syncable {
    private static final int MAX_RAW_BYTES_PER_REQUEST = 0x400000;
    private static final int MAX_PAGES_IN_REQUEST = 8192;
    private static final int MAX_DATA_BYTES_PER_REQUEST = 4177410;
    private final StorageInterface.CloudPageBlobWrapper blob;
    private final OperationContext opContext;
    private volatile IOException lastError;
    private long currentBlobSize;
    private long currentBlobOffset;
    private byte[] previousLastPageDataWritten = new byte[0];
    private ByteArrayOutputStream outBuffer;
    private final LinkedBlockingQueue<Runnable> ioQueue;
    private final ThreadPoolExecutor ioThreadPool;
    private WriteRequest lastQueuedTask;
    private boolean closed = false;
    public static final Log LOG = LogFactory.getLog(AzureNativeFileSystemStore.class);
    public static final long PAGE_BLOB_MIN_SIZE = 0x8000000L;
    public static final long PAGE_BLOB_DEFAULT_EXTENSION_SIZE = 0x8000000L;
    private long configuredPageBlobExtensionSize;

    public PageBlobOutputStream(StorageInterface.CloudPageBlobWrapper blob, OperationContext opContext, Configuration conf) throws StorageException {
        this.blob = blob;
        this.outBuffer = new ByteArrayOutputStream();
        this.opContext = opContext;
        this.lastQueuedTask = null;
        this.ioQueue = new LinkedBlockingQueue();
        this.ioThreadPool = new ThreadPoolExecutor(1, 1, 2L, TimeUnit.SECONDS, this.ioQueue);
        long pageBlobConfigSize = conf.getLong("fs.azure.page.blob.size", 0L);
        LOG.debug("Read value of fs.azure.page.blob.size as " + pageBlobConfigSize + " from configuration (0 if not present).");
        long pageBlobSize = Math.max(0x8000000L, pageBlobConfigSize);
        if (pageBlobSize % 512L != 0L) {
            pageBlobSize += 512L - pageBlobSize % 512L;
        }
        blob.create(pageBlobSize, new BlobRequestOptions(), opContext);
        this.currentBlobSize = pageBlobSize;
        this.configuredPageBlobExtensionSize = conf.getLong("fs.azure.page.blob.extension.size", 0L);
        if (this.configuredPageBlobExtensionSize < 0x8000000L) {
            this.configuredPageBlobExtensionSize = 0x8000000L;
        }
        if (this.configuredPageBlobExtensionSize % 512L != 0L) {
            this.configuredPageBlobExtensionSize += 512L - this.configuredPageBlobExtensionSize % 512L;
        }
    }

    private void checkStreamState() throws IOException {
        if (this.lastError != null) {
            throw this.lastError;
        }
    }

    @Override
    public synchronized void close() throws IOException {
        if (this.closed) {
            return;
        }
        LOG.debug("Closing page blob output stream.");
        this.flush();
        this.checkStreamState();
        this.ioThreadPool.shutdown();
        try {
            LOG.debug(this.ioThreadPool.toString());
            if (!this.ioThreadPool.awaitTermination(10L, TimeUnit.MINUTES)) {
                LOG.debug("Timed out after 10 minutes waiting for IO requests to finish");
                NativeAzureFileSystemHelper.logAllLiveStackTraces();
                LOG.debug(this.ioThreadPool.toString());
                throw new IOException("Timed out waiting for IO requests to finish");
            }
        }
        catch (InterruptedException e) {
            LOG.debug("Caught InterruptedException");
            Thread.currentThread().interrupt();
        }
        this.closed = true;
    }

    private synchronized void flushIOBuffers() {
        if (this.outBuffer.size() == 0) {
            return;
        }
        this.lastQueuedTask = new WriteRequest(this.outBuffer.toByteArray());
        this.ioThreadPool.execute(this.lastQueuedTask);
        this.outBuffer = new ByteArrayOutputStream();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void conditionalExtendFile() {
        long MAX_PAGE_BLOB_SIZE = 0x10000000000L;
        if (this.currentBlobSize == 0x10000000000L) {
            return;
        }
        if (this.currentBlobSize - this.currentBlobOffset <= 0x400000L) {
            CloudPageBlob cloudPageBlob = (CloudPageBlob)this.blob.getBlob();
            long newSize = this.currentBlobSize + this.configuredPageBlobExtensionSize;
            if (newSize > 0x10000000000L) {
                newSize = 0x10000000000L;
            }
            int MAX_RETRIES = 3;
            int retries = 1;
            boolean resizeDone = false;
            while (!resizeDone && retries <= 3) {
                try {
                    cloudPageBlob.resize(newSize);
                    resizeDone = true;
                    this.currentBlobSize = newSize;
                }
                catch (StorageException e) {
                    LOG.warn("Failed to extend size of " + cloudPageBlob.getUri());
                    try {
                        Thread.sleep(2000 * retries * retries);
                    }
                    catch (InterruptedException e1) {
                        Thread.currentThread().interrupt();
                    }
                }
                finally {
                    ++retries;
                }
            }
        }
    }

    @Override
    public void flush() throws IOException {
        this.checkStreamState();
        this.flushIOBuffers();
    }

    @Override
    public void write(byte[] data) throws IOException {
        this.write(data, 0, data.length);
    }

    @Override
    public void write(byte[] data, int offset, int length) throws IOException {
        if (offset < 0 || length < 0 || length > data.length - offset) {
            throw new IndexOutOfBoundsException();
        }
        this.writeInternal(data, offset, length);
    }

    @Override
    public void write(int byteVal) throws IOException {
        this.write(new byte[]{(byte)(byteVal & 0xFF)});
    }

    private synchronized void writeInternal(byte[] data, int offset, int length) throws IOException {
        while (length > 0) {
            this.checkStreamState();
            int availableBufferBytes = 4177410 - this.outBuffer.size();
            int nextWrite = Math.min(availableBufferBytes, length);
            this.outBuffer.write(data, offset, nextWrite);
            offset += nextWrite;
            length -= nextWrite;
            if (this.outBuffer.size() > 4177410) {
                throw new RuntimeException("Internal error: maximum write size " + Integer.toString(4177410) + "exceeded.");
            }
            if (this.outBuffer.size() != 4177410) continue;
            this.flushIOBuffers();
        }
    }

    @Override
    public synchronized void hsync() throws IOException {
        LOG.debug("Entering PageBlobOutputStream#hsync().");
        long start = System.currentTimeMillis();
        this.flush();
        LOG.debug(this.ioThreadPool.toString());
        try {
            if (this.lastQueuedTask != null) {
                this.lastQueuedTask.waitTillDone();
            }
        }
        catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
        }
        LOG.debug("Leaving PageBlobOutputStream#hsync(). Total hsync duration = " + (System.currentTimeMillis() - start) + " msec.");
    }

    @Override
    public void hflush() throws IOException {
        this.hsync();
    }

    @Deprecated
    public void sync() throws IOException {
        this.hflush();
    }

    @VisibleForTesting
    void killIoThreads() {
        this.ioThreadPool.shutdownNow();
    }

    static /* synthetic */ byte[] access$102(PageBlobOutputStream x0, byte[] x1) {
        x0.previousLastPageDataWritten = x1;
        return x1;
    }

    private class WriteRequest
    implements Runnable {
        private final byte[] dataPayload;
        private final CountDownLatch doneSignal = new CountDownLatch(1);

        public WriteRequest(byte[] dataPayload) {
            this.dataPayload = dataPayload;
        }

        public void waitTillDone() throws InterruptedException {
            this.doneSignal.await();
        }

        @Override
        public void run() {
            try {
                LOG.debug("before runInternal()");
                this.runInternal();
                LOG.debug("after runInternal()");
            }
            finally {
                this.doneSignal.countDown();
            }
        }

        private void runInternal() {
            if (PageBlobOutputStream.this.lastError != null) {
                return;
            }
            if (this.dataPayload.length == 0) {
                return;
            }
            int totalDataBytes = this.dataPayload.length + PageBlobOutputStream.this.previousLastPageDataWritten.length;
            int numberOfPages = totalDataBytes / 510 + (totalDataBytes % 510 == 0 ? 0 : 1);
            byte[] rawPayload = new byte[numberOfPages * 512];
            int currentLastPageDataSize = -1;
            for (int page = 0; page < numberOfPages; ++page) {
                int dataOffset = page * 510;
                int rawOffset = page * 512;
                int currentPageDataSize = Math.min(510, totalDataBytes - dataOffset);
                currentLastPageDataSize = currentPageDataSize;
                byte[] header = PageBlobFormatHelpers.fromShort((short)currentPageDataSize);
                System.arraycopy(header, 0, rawPayload, rawOffset, header.length);
                rawOffset += header.length;
                int bytesToCopyFromDataPayload = currentPageDataSize;
                if (dataOffset < PageBlobOutputStream.this.previousLastPageDataWritten.length) {
                    int bytesToCopyFromLastPage = Math.min(currentPageDataSize, PageBlobOutputStream.this.previousLastPageDataWritten.length - dataOffset);
                    System.arraycopy(PageBlobOutputStream.this.previousLastPageDataWritten, dataOffset, rawPayload, rawOffset, bytesToCopyFromLastPage);
                    bytesToCopyFromDataPayload -= bytesToCopyFromLastPage;
                    rawOffset += bytesToCopyFromLastPage;
                    dataOffset += bytesToCopyFromLastPage;
                }
                if (dataOffset < PageBlobOutputStream.this.previousLastPageDataWritten.length) continue;
                System.arraycopy(this.dataPayload, dataOffset - PageBlobOutputStream.this.previousLastPageDataWritten.length, rawPayload, rawOffset, bytesToCopyFromDataPayload);
            }
            this.writePayloadToServer(rawPayload);
            PageBlobOutputStream.this.currentBlobOffset = PageBlobOutputStream.this.currentBlobOffset + (long)rawPayload.length;
            if (currentLastPageDataSize < 510) {
                int startOffset = (numberOfPages - 1) * 512 + 2;
                PageBlobOutputStream.access$102(PageBlobOutputStream.this, Arrays.copyOfRange(rawPayload, startOffset, startOffset + currentLastPageDataSize));
                PageBlobOutputStream.this.currentBlobOffset = PageBlobOutputStream.this.currentBlobOffset - 512L;
            } else {
                PageBlobOutputStream.access$102(PageBlobOutputStream.this, new byte[0]);
            }
            PageBlobOutputStream.this.conditionalExtendFile();
        }

        private void writePayloadToServer(byte[] rawPayload) {
            ByteArrayInputStream wrapperStream = new ByteArrayInputStream(rawPayload);
            LOG.debug("writing payload of " + rawPayload.length + " bytes to Azure page blob");
            try {
                long start = System.currentTimeMillis();
                PageBlobOutputStream.this.blob.uploadPages(wrapperStream, PageBlobOutputStream.this.currentBlobOffset, rawPayload.length, PageBlobFormatHelpers.withMD5Checking(), PageBlobOutputStream.this.opContext);
                long end = System.currentTimeMillis();
                LOG.trace("Azure uploadPages time for " + rawPayload.length + " bytes = " + (end - start));
            }
            catch (IOException ex) {
                LOG.debug(ExceptionUtils.getStackTrace(ex));
                PageBlobOutputStream.this.lastError = ex;
            }
            catch (StorageException ex) {
                LOG.debug(ExceptionUtils.getStackTrace(ex));
                PageBlobOutputStream.this.lastError = new IOException(ex);
            }
            if (PageBlobOutputStream.this.lastError != null) {
                LOG.debug("Caught error in PageBlobOutputStream#writePayloadToServer()");
            }
        }
    }
}

