package alluxio.worker.page;

import alluxio.client.file.CacheContext;
import alluxio.client.file.cache.CacheManager;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.grpc.ErrorType;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.network.protocol.databuffer.NioDirectBufferPool;
import alluxio.worker.block.io.BlockReader;
import com.google.common.base.Preconditions;
import com.google.protobuf.Any;
import io.grpc.Status;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;

@NotThreadSafe
/* loaded from: input_file:alluxio/worker/page/PagedBlockReader.class */
public class PagedBlockReader extends BlockReader {
    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
    private final long mPageSize;
    private final CacheManager mCacheManager;
    private final Optional<PagedUfsBlockReader> mUfsBlockReader;
    private final PagedBlockMeta mBlockMeta;
    private boolean mClosed = false;
    private boolean mReadFromLocalCache = false;
    private boolean mReadFromUfs = false;
    private long mPosition;

    public PagedBlockReader(CacheManager cacheManager, PagedBlockMeta pagedBlockMeta, long j, Optional<PagedUfsBlockReader> optional, long j2) {
        Preconditions.checkArgument(j >= 0 && j <= pagedBlockMeta.getBlockSize(), "Attempt to read block %d which is %d bytes long at invalid byte offset %d", Long.valueOf(pagedBlockMeta.getBlockId()), Long.valueOf(pagedBlockMeta.getBlockSize()), Long.valueOf(j));
        this.mCacheManager = cacheManager;
        this.mUfsBlockReader = optional;
        this.mBlockMeta = pagedBlockMeta;
        this.mPageSize = j2;
        this.mPosition = j;
    }

    public ByteBuffer read(long j, long j2) throws IOException {
        if (j2 == 0 || j >= this.mBlockMeta.getBlockSize()) {
            return EMPTY_BYTE_BUFFER;
        }
        long min = Math.min(j2, this.mBlockMeta.getBlockSize() - j);
        ensureReadable(j, min);
        ByteBuffer allocateDirect = ByteBuffer.allocateDirect((int) min);
        ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(allocateDirect);
        wrappedBuffer.clear();
        long read = read(wrappedBuffer, j, min);
        if (read < 0) {
            return EMPTY_BYTE_BUFFER;
        }
        allocateDirect.position(0);
        allocateDirect.limit((int) read);
        return allocateDirect;
    }

    private long read(ByteBuf byteBuf, long j, long j2) throws IOException {
        Preconditions.checkArgument(((long) byteBuf.writableBytes()) >= j2, "buffer overflow, trying to write %s bytes, only %s writable", j2, byteBuf.writableBytes());
        NettyBufTargetBuffer nettyBufTargetBuffer = new NettyBufTargetBuffer(byteBuf);
        long j3 = 0;
        while (j3 < j2) {
            long j4 = j + j3;
            long j5 = j4 / this.mPageSize;
            BlockPageId blockPageId = new BlockPageId(this.mBlockMeta.getBlockId(), j5, this.mBlockMeta.getBlockSize());
            int i = (int) (j4 % this.mPageSize);
            int min = (int) Math.min(this.mPageSize - i, j2 - j3);
            int i2 = this.mCacheManager.get(blockPageId, i, min, nettyBufTargetBuffer, CacheContext.defaults());
            if (i2 > 0) {
                j3 += i2;
                MetricsSystem.meter(MetricKey.CLIENT_CACHE_BYTES_READ_CACHE.getName()).mark(j3);
                this.mReadFromLocalCache = true;
            } else {
                if (!this.mUfsBlockReader.isPresent()) {
                    throw new AlluxioRuntimeException(Status.INTERNAL, String.format("Block %d cannot be read from UFS as UFS reader is missing, this is most likely a bug", Long.valueOf(this.mBlockMeta.getBlockId())), (Throwable) null, ErrorType.Internal, false, new Any[0]);
                }
                PagedUfsBlockReader pagedUfsBlockReader = this.mUfsBlockReader.get();
                ByteBuffer acquire = NioDirectBufferPool.acquire((int) this.mPageSize);
                try {
                    int readPageAtIndex = pagedUfsBlockReader.readPageAtIndex(acquire, j5);
                    if (readPageAtIndex > 0) {
                        acquire.position(i);
                        acquire.limit(i + min);
                        byteBuf.writeBytes(acquire);
                        j3 += min;
                        MetricsSystem.meter(MetricKey.CLIENT_CACHE_BYTES_REQUESTED_EXTERNAL.getName()).mark(min);
                        this.mReadFromUfs = true;
                        acquire.rewind();
                        acquire.limit(readPageAtIndex);
                        if (pagedUfsBlockReader.getUfsReadOptions().isCacheIntoAlluxio()) {
                            this.mCacheManager.put(blockPageId, acquire);
                        }
                    }
                } finally {
                    NioDirectBufferPool.release(acquire);
                }
            }
        }
        return j3;
    }

    public long getLength() {
        return this.mBlockMeta.getBlockSize();
    }

    public ReadableByteChannel getChannel() {
        throw new UnsupportedOperationException();
    }

    public int transferTo(ByteBuf byteBuf) throws IOException {
        if (this.mBlockMeta.getBlockSize() <= this.mPosition) {
            return -1;
        }
        int min = (int) Math.min(byteBuf.writableBytes(), this.mBlockMeta.getBlockSize() - this.mPosition);
        ensureReadable(this.mPosition, min);
        long read = read(byteBuf, this.mPosition, min);
        this.mPosition += read;
        return (int) read;
    }

    public boolean isClosed() {
        return this.mClosed;
    }

    public String getLocation() {
        return this.mBlockMeta.getPath();
    }

    public void close() throws IOException {
        if (!isClosed()) {
            if (this.mReadFromLocalCache) {
                MetricsSystem.counter(MetricKey.WORKER_BLOCKS_READ_LOCAL.getName()).inc();
            }
            if (this.mReadFromUfs) {
                MetricsSystem.counter(MetricKey.WORKER_BLOCKS_READ_UFS.getName()).inc();
            }
        }
        this.mClosed = true;
    }

    private void ensureReadable(long j, long j2) {
        Preconditions.checkState(!this.mClosed, "reader closed");
        Preconditions.checkArgument(j2 >= 0, "negative read length %s", j2);
        Preconditions.checkArgument(j >= 0, "negative offset %s", j);
        Preconditions.checkArgument(j <= this.mBlockMeta.getBlockSize(), "offset (%s) exceeds block size (%s)", j, this.mBlockMeta.getBlockSize());
        Preconditions.checkArgument(j + j2 >= 0 && j + j2 <= this.mBlockMeta.getBlockSize(), "read end %s exceed block size %s", j + j2, this.mBlockMeta.getBlockSize());
    }
}
