package alluxio.client.block.stream;

import alluxio.client.block.stream.DataReader;
import alluxio.client.file.FileSystemContext;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.CancelledException;
import alluxio.exception.status.DeadlineExceededException;
import alluxio.exception.status.UnavailableException;
import alluxio.network.protocol.RPCProtoMessage;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.NettyDataBuffer;
import alluxio.proto.dataserver.Protocol;
import alluxio.proto.status.Status;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.com.google.common.base.Throwables;
import alluxio.shaded.client.io.netty.buffer.ByteBuf;
import alluxio.shaded.client.io.netty.buffer.Unpooled;
import alluxio.shaded.client.io.netty.channel.Channel;
import alluxio.shaded.client.io.netty.channel.ChannelFutureListener;
import alluxio.shaded.client.io.netty.channel.ChannelHandlerContext;
import alluxio.shaded.client.io.netty.channel.ChannelInboundHandlerAdapter;
import alluxio.shaded.client.io.netty.util.concurrent.Future;
import alluxio.shaded.client.io.netty.util.concurrent.GenericFutureListener;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.util.CommonUtils;
import alluxio.util.network.NettyUtils;
import alluxio.util.proto.ProtoMessage;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/NettyDataReader.class */
public final class NettyDataReader implements DataReader {
    private static final Logger LOG = LoggerFactory.getLogger(NettyDataReader.class);
    private static final int MAX_PACKETS_IN_FLIGHT = Configuration.getInt(PropertyKey.USER_NETWORK_NETTY_READER_BUFFER_SIZE_PACKETS);
    private static final long READ_TIMEOUT_MS = Configuration.getMs(PropertyKey.USER_NETWORK_NETTY_TIMEOUT_MS);
    private static final ByteBuf THROWABLE = Unpooled.buffer(0);
    private static final ByteBuf UFS_READ_HEARTBEAT = Unpooled.buffer(0);
    private static final ByteBuf EOF_OR_CANCELLED = Unpooled.buffer(0);
    private final FileSystemContext mContext;
    private final Channel mChannel;
    private final Protocol.ReadRequest mReadRequest;
    private final WorkerNetAddress mAddress;
    private final BlockingQueue<ByteBuf> mPackets;
    private volatile Throwable mPacketReaderException;
    private long mPosToRead;
    private boolean mDone;
    private boolean mClosed;

    /* loaded from: input_file:alluxio/client/block/stream/NettyDataReader$Factory.class */
    public static class Factory implements DataReader.Factory {
        private final FileSystemContext mContext;
        private final WorkerNetAddress mAddress;
        private final Protocol.ReadRequest.Builder mReadRequestBuilder;

        public Factory(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, Protocol.ReadRequest.Builder builder) {
            this.mContext = fileSystemContext;
            this.mAddress = workerNetAddress;
            this.mReadRequestBuilder = builder;
        }

        @Override // alluxio.client.block.stream.DataReader.Factory
        public NettyDataReader create(long j, long j2) throws IOException {
            return new NettyDataReader(this.mContext, this.mAddress, this.mReadRequestBuilder.setOffset(j).setLength(j2).build());
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
        }
    }

    /* loaded from: input_file:alluxio/client/block/stream/NettyDataReader$PacketReadHandler.class */
    private class PacketReadHandler extends ChannelInboundHandlerAdapter {
        PacketReadHandler() {
        }

        @Override // alluxio.shaded.client.io.netty.channel.ChannelInboundHandlerAdapter, alluxio.shaded.client.io.netty.channel.ChannelInboundHandler
        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws IOException {
            ByteBuf byteBuf;
            if (!(obj instanceof RPCProtoMessage)) {
                throw new IllegalStateException(String.format("Incorrect response type %s, %s.", obj.getClass().getCanonicalName(), obj));
            }
            RPCProtoMessage rPCProtoMessage = (RPCProtoMessage) obj;
            ProtoMessage message = rPCProtoMessage.getMessage();
            if (message.isReadResponse()) {
                Preconditions.checkState(message.asReadResponse().getType() == Protocol.ReadResponse.Type.UFS_READ_HEARTBEAT);
                byteBuf = NettyDataReader.UFS_READ_HEARTBEAT;
            } else {
                if (!message.isResponse()) {
                    throw new IllegalStateException(String.format("Incorrect response type %s.", message.toString()));
                }
                if (message.asResponse().getStatus() != Status.PStatus.CANCELLED) {
                    CommonUtils.unwrapResponseFrom(rPCProtoMessage.getMessage().asResponse(), channelHandlerContext.channel());
                }
                DataBuffer payloadDataBuffer = rPCProtoMessage.getPayloadDataBuffer();
                if (payloadDataBuffer == null) {
                    byteBuf = NettyDataReader.EOF_OR_CANCELLED;
                } else {
                    Preconditions.checkState(payloadDataBuffer.getNettyOutput() instanceof ByteBuf);
                    byteBuf = (ByteBuf) payloadDataBuffer.getNettyOutput();
                }
            }
            if (NettyDataReader.this.tooManyPacketsPending()) {
                NettyUtils.disableAutoRead(channelHandlerContext.channel());
            }
            NettyDataReader.this.mPackets.offer(byteBuf);
        }

        @Override // alluxio.shaded.client.io.netty.channel.ChannelInboundHandlerAdapter, alluxio.shaded.client.io.netty.channel.ChannelHandlerAdapter, alluxio.shaded.client.io.netty.channel.ChannelHandler, alluxio.shaded.client.io.netty.channel.ChannelInboundHandler
        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            NettyDataReader.LOG.error("Exception is caught while reading block {} from channel {}:", new Object[]{Long.valueOf(NettyDataReader.this.mReadRequest.getBlockId()), channelHandlerContext.channel(), th});
            if (NettyDataReader.this.mPacketReaderException == null) {
                NettyDataReader.this.mPacketReaderException = th;
                NettyDataReader.this.mPackets.offer(NettyDataReader.THROWABLE);
            }
            channelHandlerContext.close();
        }

        @Override // alluxio.shaded.client.io.netty.channel.ChannelInboundHandlerAdapter, alluxio.shaded.client.io.netty.channel.ChannelInboundHandler
        public void channelUnregistered(ChannelHandlerContext channelHandlerContext) {
            NettyDataReader.LOG.warn("Channel is closed while reading block {} from channel {}.", Long.valueOf(NettyDataReader.this.mReadRequest.getBlockId()), channelHandlerContext.channel());
            if (NettyDataReader.this.mPacketReaderException == null) {
                NettyDataReader.this.mPacketReaderException = new UnavailableException(String.format("Channel %s is closed.", NettyDataReader.this.mChannel.toString()));
                NettyDataReader.this.mPackets.offer(NettyDataReader.THROWABLE);
            }
            channelHandlerContext.fireChannelUnregistered();
        }
    }

    private NettyDataReader(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, Protocol.ReadRequest readRequest) throws IOException {
        this.mPackets = new LinkedBlockingQueue();
        this.mDone = false;
        this.mClosed = false;
        this.mContext = fileSystemContext;
        this.mAddress = workerNetAddress;
        this.mPosToRead = readRequest.getOffset();
        this.mReadRequest = readRequest;
        this.mChannel = this.mContext.acquireNettyChannel(workerNetAddress);
        this.mChannel.pipeline().addLast(new PacketReadHandler());
        this.mChannel.writeAndFlush(new RPCProtoMessage(new ProtoMessage(this.mReadRequest))).addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelFutureListener.CLOSE_ON_FAILURE);
    }

    @Override // alluxio.client.block.stream.DataReader
    public long pos() {
        return this.mPosToRead;
    }

    @Override // alluxio.client.block.stream.DataReader
    public DataBuffer readChunk() throws IOException {
        ByteBuf poll;
        Preconditions.checkState(!this.mClosed, "PacketReader is closed while reading packets.");
        if (!tooManyPacketsPending()) {
            NettyUtils.enableAutoRead(this.mChannel);
        }
        do {
            try {
                poll = this.mPackets.poll(READ_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CancelledException(e);
            }
        } while (poll == UFS_READ_HEARTBEAT);
        if (poll == null) {
            throw new DeadlineExceededException(String.format("Timeout to read %d from %s.", Long.valueOf(this.mReadRequest.getBlockId()), this.mChannel.toString()));
        }
        if (poll == THROWABLE) {
            Preconditions.checkNotNull(this.mPacketReaderException, "mPacketReaderException");
            Throwables.propagateIfPossible(this.mPacketReaderException, IOException.class);
            throw AlluxioStatusException.fromCheckedException(this.mPacketReaderException);
        }
        if (poll == EOF_OR_CANCELLED) {
            this.mDone = true;
            return null;
        }
        this.mPosToRead += poll.readableBytes();
        Preconditions.checkState(this.mPosToRead - this.mReadRequest.getOffset() <= this.mReadRequest.getLength());
        return new NettyDataBuffer(poll);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.mClosed) {
            return;
        }
        try {
            if (this.mDone) {
                if (this.mChannel.isOpen()) {
                    this.mChannel.pipeline().removeLast();
                    NettyUtils.enableAutoRead(this.mChannel);
                }
                this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
                this.mClosed = true;
                return;
            }
            if (!this.mChannel.isOpen()) {
                if (this.mChannel.isOpen()) {
                    this.mChannel.pipeline().removeLast();
                    NettyUtils.enableAutoRead(this.mChannel);
                }
                this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
                this.mClosed = true;
                return;
            }
            if (remaining() > 0) {
                this.mChannel.writeAndFlush(new RPCProtoMessage(new ProtoMessage(this.mReadRequest.toBuilder().setCancel(true).build()))).addListener2((GenericFutureListener<? extends Future<? super Void>>) ChannelFutureListener.CLOSE_ON_FAILURE);
            }
            try {
                readAndDiscardAll();
                if (this.mChannel.isOpen()) {
                    this.mChannel.pipeline().removeLast();
                    NettyUtils.enableAutoRead(this.mChannel);
                }
                this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
                this.mClosed = true;
            } catch (IOException e) {
                LOG.warn("Failed to close the NettyBlockReader (block: {}, address: {}) with exception {}.", new Object[]{Long.valueOf(this.mReadRequest.getBlockId()), this.mAddress, e.getMessage()});
                CommonUtils.closeChannel(this.mChannel);
                if (this.mChannel.isOpen()) {
                    this.mChannel.pipeline().removeLast();
                    NettyUtils.enableAutoRead(this.mChannel);
                }
                this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
                this.mClosed = true;
            }
        } catch (Throwable th) {
            if (this.mChannel.isOpen()) {
                this.mChannel.pipeline().removeLast();
                NettyUtils.enableAutoRead(this.mChannel);
            }
            this.mContext.releaseNettyChannel(this.mAddress, this.mChannel);
            this.mClosed = true;
            throw th;
        }
    }

    private void readAndDiscardAll() throws IOException {
        DataBuffer readChunk;
        do {
            readChunk = readChunk();
            if (readChunk != null) {
                readChunk.release();
            }
        } while (readChunk != null);
    }

    private long remaining() {
        return (this.mReadRequest.getOffset() + this.mReadRequest.getLength()) - this.mPosToRead;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean tooManyPacketsPending() {
        return this.mPackets.size() >= MAX_PACKETS_IN_FLIGHT;
    }
}
