package alluxio.worker.block;

import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.resource.CloseableResource;
import alluxio.underfs.UfsManager;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.OpenOptions;
import alluxio.util.IdUtils;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.block.meta.UnderFileSystemBlockMeta;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.google.common.base.Preconditions;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/worker/block/UnderFileSystemBlockReader.class */
public final class UnderFileSystemBlockReader extends BlockReader {
    private static final Logger LOG = LoggerFactory.getLogger(UnderFileSystemBlockReader.class);
    private static final Counter BLOCKS_READ_UFS = MetricsSystem.counter(MetricKey.WORKER_BLOCKS_READ_UFS.getName());
    private final Counter mUfsBytesRead;
    private final Meter mUfsBytesReadThroughput;
    private final long mInitialBlockSize;
    private final UnderFileSystemBlockMeta mBlockMeta;
    private final LocalBlockStore mLocalBlockStore;
    private final UfsInputStreamCache mUfsInstreamCache;
    private final CloseableResource<UnderFileSystem> mUfsResource;
    private final boolean mIsPositionShort;
    private InputStream mUnderFileSystemInputStream;
    private BlockWriter mBlockWriter;
    private boolean mClosed;
    private long mInStreamPos = -1;

    public static UnderFileSystemBlockReader create(UnderFileSystemBlockMeta underFileSystemBlockMeta, long j, boolean z, LocalBlockStore localBlockStore, UfsManager.UfsClient ufsClient, UfsInputStreamCache ufsInputStreamCache, Counter counter, Meter meter) throws IOException {
        UnderFileSystemBlockReader underFileSystemBlockReader = new UnderFileSystemBlockReader(underFileSystemBlockMeta, z, localBlockStore, ufsClient, ufsInputStreamCache, counter, meter);
        underFileSystemBlockReader.init(j);
        return underFileSystemBlockReader;
    }

    private UnderFileSystemBlockReader(UnderFileSystemBlockMeta underFileSystemBlockMeta, boolean z, LocalBlockStore localBlockStore, UfsManager.UfsClient ufsClient, UfsInputStreamCache ufsInputStreamCache, Counter counter, Meter meter) {
        this.mInitialBlockSize = underFileSystemBlockMeta.getBlockSize();
        this.mBlockMeta = underFileSystemBlockMeta;
        this.mLocalBlockStore = localBlockStore;
        this.mUfsInstreamCache = ufsInputStreamCache;
        this.mUfsResource = ufsClient.acquireUfsResource();
        this.mIsPositionShort = z;
        this.mUfsBytesRead = counter;
        this.mUfsBytesReadThroughput = meter;
    }

    private void init(long j) throws IOException {
        updateUnderFileSystemInputStream(j);
        updateBlockWriter(j);
    }

    public ReadableByteChannel getChannel() {
        throw new UnsupportedOperationException("UFSFileBlockReader#getChannel is not supported");
    }

    public long getLength() {
        return this.mBlockMeta.getBlockSize();
    }

    public ByteBuffer read(long j, long j2) throws IOException {
        Preconditions.checkState(!this.mClosed);
        updateUnderFileSystemInputStream(j);
        updateBlockWriter(j);
        long min = Math.min(j2, this.mBlockMeta.getBlockSize() - j);
        if (min <= 0) {
            return ByteBuffer.allocate(0);
        }
        byte[] bArr = new byte[(int) min];
        int i = 0;
        Preconditions.checkNotNull(this.mUnderFileSystemInputStream, "mUnderFileSystemInputStream");
        while (i < min) {
            try {
                int read = this.mUnderFileSystemInputStream.read(bArr, i, (int) (min - i));
                if (read == -1) {
                    break;
                }
                i += read;
            } catch (IOException e) {
                throw AlluxioStatusException.fromIOException(e);
            }
        }
        this.mInStreamPos += i;
        Preconditions.checkState(((long) i) == min, "Not enough bytes have been read [bytesRead: %s, bytesToRead: %s] from the UFS file: %s.", Integer.valueOf(i), Long.valueOf(min), this.mBlockMeta.getUnderFileSystemPath());
        if (this.mBlockWriter != null && this.mBlockWriter.getPosition() < this.mInStreamPos) {
            try {
                Preconditions.checkState(this.mBlockWriter.getPosition() >= j);
                this.mLocalBlockStore.requestSpace(this.mBlockMeta.getSessionId(), this.mBlockMeta.getBlockId(), this.mInStreamPos - this.mBlockWriter.getPosition());
                this.mBlockWriter.append(ByteBuffer.wrap(bArr, (int) (this.mBlockWriter.getPosition() - j), (int) (this.mInStreamPos - this.mBlockWriter.getPosition())).duplicate());
            } catch (Exception e2) {
                LOG.warn("Failed to cache data read from UFS (on read()): {}", e2.toString());
                try {
                    cancelBlockWriter();
                } catch (IOException e3) {
                    LOG.error("Failed to cancel block writer:", e3);
                }
            }
        }
        this.mUfsBytesRead.inc(i);
        this.mUfsBytesReadThroughput.mark(i);
        return ByteBuffer.wrap(bArr, 0, i);
    }

    public int transferTo(ByteBuf byteBuf) throws IOException {
        Preconditions.checkState(!this.mClosed);
        if (this.mUnderFileSystemInputStream == null || this.mBlockMeta.getBlockSize() <= this.mInStreamPos) {
            return -1;
        }
        ByteBuf byteBuf2 = null;
        if (this.mBlockWriter != null) {
            byteBuf2 = byteBuf.duplicate();
            byteBuf2.readerIndex(byteBuf2.writerIndex());
        }
        int writeBytes = byteBuf.writeBytes(this.mUnderFileSystemInputStream, (int) Math.min(byteBuf.writableBytes(), this.mBlockMeta.getBlockSize() - this.mInStreamPos));
        if (writeBytes <= 0) {
            return writeBytes;
        }
        this.mInStreamPos += writeBytes;
        if (this.mBlockWriter != null && byteBuf2 != null) {
            try {
                byteBuf2.writerIndex(byteBuf.writerIndex());
                while (byteBuf2.readableBytes() > 0) {
                    this.mLocalBlockStore.requestSpace(this.mBlockMeta.getSessionId(), this.mBlockMeta.getBlockId(), this.mInStreamPos - this.mBlockWriter.getPosition());
                    this.mBlockWriter.append(byteBuf2);
                }
            } catch (Exception e) {
                LOG.warn("Failed to cache data read from UFS (on transferTo()): {}", e.toString());
                cancelBlockWriter();
            }
        }
        this.mUfsBytesRead.inc(writeBytes);
        this.mUfsBytesReadThroughput.mark(writeBytes);
        return writeBytes;
    }

    public void close() throws IOException {
        if (this.mClosed) {
            return;
        }
        super.close();
        try {
            updateBlockWriter(this.mBlockMeta.getBlockSize());
            if (this.mUnderFileSystemInputStream != null) {
                this.mUfsInstreamCache.release(this.mUnderFileSystemInputStream);
                this.mUnderFileSystemInputStream = null;
            }
            if (this.mBlockWriter != null) {
                this.mBlockWriter.close();
            }
            this.mUfsResource.close();
        } finally {
            this.mClosed = true;
            BLOCKS_READ_UFS.inc();
        }
    }

    public boolean isClosed() {
        return this.mClosed;
    }

    public String getLocation() {
        return this.mBlockMeta.getUnderFileSystemPath();
    }

    private void updateUnderFileSystemInputStream(long j) throws IOException {
        if (this.mUnderFileSystemInputStream != null && j != this.mInStreamPos) {
            this.mUfsInstreamCache.release(this.mUnderFileSystemInputStream);
            this.mUnderFileSystemInputStream = null;
            this.mInStreamPos = -1L;
        }
        if (this.mUnderFileSystemInputStream != null || j >= this.mBlockMeta.getBlockSize()) {
            return;
        }
        this.mUnderFileSystemInputStream = this.mUfsInstreamCache.acquire((UnderFileSystem) this.mUfsResource.get(), this.mBlockMeta.getUnderFileSystemPath(), IdUtils.fileIdFromBlockId(this.mBlockMeta.getBlockId()), OpenOptions.defaults().setOffset(this.mBlockMeta.getOffset() + j).setPositionShort(this.mIsPositionShort));
        this.mInStreamPos = j;
    }

    private void cancelBlockWriter() throws IOException {
        if (this.mBlockWriter == null) {
            return;
        }
        try {
            this.mBlockWriter.close();
            this.mBlockWriter = null;
            this.mLocalBlockStore.abortBlock(this.mBlockMeta.getSessionId(), this.mBlockMeta.getBlockId());
        } catch (Exception e) {
            throw AlluxioStatusException.fromThrowable(e);
        }
    }

    private void updateBlockWriter(long j) throws IOException {
        if (this.mBlockWriter != null && j > this.mBlockWriter.getPosition()) {
            cancelBlockWriter();
        }
        try {
            if (this.mBlockWriter == null && j == 0 && !this.mBlockMeta.isNoCache()) {
                this.mLocalBlockStore.createBlock(this.mBlockMeta.getSessionId(), this.mBlockMeta.getBlockId(), AllocateOptions.forCreate(this.mInitialBlockSize, BlockStoreLocation.anyDirInTier(BlockMetadataManager.WORKER_STORAGE_TIER_ASSOC.getAlias(0))));
                this.mBlockWriter = this.mLocalBlockStore.createBlockWriter(this.mBlockMeta.getSessionId(), this.mBlockMeta.getBlockId());
            }
        } catch (AlluxioRuntimeException e) {
            LOG.warn("Failed to update block writer for UFS block [blockId: {}, ufsPath: {}, offset: {}]: {}", new Object[]{Long.valueOf(this.mBlockMeta.getBlockId()), this.mBlockMeta.getUnderFileSystemPath(), Long.valueOf(j), e.toString()});
            this.mBlockWriter = null;
        } catch (IllegalStateException e2) {
            LOG.debug("Failed to update block writer for UFS block [blockId: {}, ufsPath: {}, offset: {}].Concurrent UFS readers may be caching the same block.", new Object[]{Long.valueOf(this.mBlockMeta.getBlockId()), this.mBlockMeta.getUnderFileSystemPath(), Long.valueOf(j), e2});
            this.mBlockWriter = null;
        }
    }
}
