package alluxio.client.block.stream;

import alluxio.client.WriteType;
import alluxio.client.file.FileSystemContext;
import alluxio.client.file.options.OutStreamOptions;
import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.Chunk;
import alluxio.grpc.DataMessage;
import alluxio.grpc.RequestType;
import alluxio.grpc.WriteRequest;
import alluxio.grpc.WriteRequestCommand;
import alluxio.grpc.WriteRequestMarshaller;
import alluxio.grpc.WriteResponse;
import alluxio.network.protocol.databuffer.NettyDataBuffer;
import alluxio.proto.dataserver.Protocol;
import alluxio.resource.CloseableResource;
import alluxio.shaded.client.com.google.common.base.MoreObjects;
import alluxio.shaded.client.com.google.common.base.Preconditions;
import alluxio.shaded.client.com.google.protobuf.UnsafeByteOperations;
import alluxio.shaded.client.io.netty.buffer.ByteBuf;
import alluxio.shaded.client.javax.annotation.concurrent.NotThreadSafe;
import alluxio.util.proto.ProtoUtils;
import alluxio.wire.WorkerNetAddress;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/client/block/stream/GrpcDataWriter.class */
public final class GrpcDataWriter implements DataWriter {
    private static final Logger LOG = LoggerFactory.getLogger(GrpcDataWriter.class);
    private final int mWriterBufferSizeMessages;
    private final long mDataTimeoutMs;
    private final long mWriterCloseTimeoutMs;
    private final long mWriterFlushTimeoutMs;
    private final FileSystemContext mContext;
    private final CloseableResource<BlockWorkerClient> mClient;
    private final WorkerNetAddress mAddress;
    private final long mLength;
    private final WriteRequestCommand mPartialRequest;
    private final long mChunkSize;
    private final GrpcBlockingStream<WriteRequest, WriteResponse> mStream;
    private final WriteRequestMarshaller mMarshaller;
    private long mPosToQueue;

    public static GrpcDataWriter create(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, long j, long j2, RequestType requestType, OutStreamOptions outStreamOptions) throws IOException {
        long bytes = fileSystemContext.getClusterConf().getBytes(PropertyKey.USER_NETWORK_WRITER_CHUNK_SIZE_BYTES);
        CloseableResource<BlockWorkerClient> acquireBlockWorkerClient = fileSystemContext.acquireBlockWorkerClient(workerNetAddress);
        try {
            return new GrpcDataWriter(fileSystemContext, workerNetAddress, j, j2, bytes, requestType, outStreamOptions, acquireBlockWorkerClient);
        } catch (Exception e) {
            acquireBlockWorkerClient.close();
            throw e;
        }
    }

    private GrpcDataWriter(FileSystemContext fileSystemContext, WorkerNetAddress workerNetAddress, long j, long j2, long j3, RequestType requestType, OutStreamOptions outStreamOptions, CloseableResource<BlockWorkerClient> closeableResource) throws IOException {
        this.mContext = fileSystemContext;
        this.mAddress = workerNetAddress;
        this.mLength = j2;
        AlluxioConfiguration clusterConf = fileSystemContext.getClusterConf();
        this.mDataTimeoutMs = clusterConf.getMs(PropertyKey.USER_NETWORK_DATA_TIMEOUT_MS);
        this.mWriterBufferSizeMessages = clusterConf.getInt(PropertyKey.USER_NETWORK_WRITER_BUFFER_SIZE_MESSAGES);
        this.mWriterCloseTimeoutMs = clusterConf.getMs(PropertyKey.USER_NETWORK_WRITER_CLOSE_TIMEOUT_MS);
        this.mWriterFlushTimeoutMs = clusterConf.getMs(PropertyKey.USER_NETWORK_WRITER_FLUSH_TIMEOUT);
        WriteRequestCommand.Builder mediumType = WriteRequestCommand.newBuilder().setId(j).setTier(outStreamOptions.getWriteTier()).setType(requestType).setMediumType(outStreamOptions.getMediumType());
        if (requestType == RequestType.UFS_FILE) {
            mediumType.setCreateUfsFileOptions(Protocol.CreateUfsFileOptions.newBuilder().setUfsPath(outStreamOptions.getUfsPath()).setOwner(outStreamOptions.getOwner()).setGroup(outStreamOptions.getGroup()).setMode(outStreamOptions.getMode().toShort()).setMountId(outStreamOptions.getMountId()).setAcl(ProtoUtils.toProto(outStreamOptions.getAcl())).build());
        }
        boolean z = requestType == RequestType.UFS_FALLBACK_BLOCK;
        boolean z2 = requestType == RequestType.ALLUXIO_BLOCK && outStreamOptions.getWriteType() == WriteType.ASYNC_THROUGH && clusterConf.getBoolean(PropertyKey.USER_FILE_UFS_TIER_ENABLED);
        if (z || z2) {
            mediumType.setType(RequestType.UFS_FALLBACK_BLOCK);
            mediumType.setCreateUfsBlockOptions(Protocol.CreateUfsBlockOptions.newBuilder().setMountId(outStreamOptions.getMountId()).setFallback(z).build());
        }
        mediumType.setPinOnCreate(outStreamOptions.getWriteType() == WriteType.ASYNC_THROUGH);
        this.mPartialRequest = mediumType.buildPartial();
        this.mChunkSize = j3;
        this.mClient = closeableResource;
        this.mMarshaller = new WriteRequestMarshaller();
        if (clusterConf.getBoolean(PropertyKey.USER_NETWORK_ZEROCOPY_ENABLED)) {
            BlockWorkerClient blockWorkerClient = this.mClient.get();
            blockWorkerClient.getClass();
            this.mStream = new GrpcDataMessageBlockingStream(blockWorkerClient::writeBlock, this.mWriterBufferSizeMessages, MoreObjects.toStringHelper(this).add("request", this.mPartialRequest).add("address", workerNetAddress).toString(), this.mMarshaller, null);
        } else {
            BlockWorkerClient blockWorkerClient2 = this.mClient.get();
            blockWorkerClient2.getClass();
            this.mStream = new GrpcBlockingStream<>(blockWorkerClient2::writeBlock, this.mWriterBufferSizeMessages, MoreObjects.toStringHelper(this).add("request", this.mPartialRequest).add("address", workerNetAddress).toString());
        }
        this.mStream.send(WriteRequest.newBuilder().setCommand(this.mPartialRequest.toBuilder()).build(), this.mDataTimeoutMs);
    }

    @Override // alluxio.client.block.stream.DataWriter
    public long pos() {
        return this.mPosToQueue;
    }

    @Override // alluxio.client.block.stream.DataWriter
    public void writeChunk(ByteBuf byteBuf) throws IOException {
        this.mPosToQueue += byteBuf.readableBytes();
        try {
            WriteRequest build = WriteRequest.newBuilder().setCommand(this.mPartialRequest).setChunk(Chunk.newBuilder().setData(UnsafeByteOperations.unsafeWrap(byteBuf.nioBuffer())).build()).build();
            if (this.mStream instanceof GrpcDataMessageBlockingStream) {
                ((GrpcDataMessageBlockingStream) this.mStream).sendDataMessage(new DataMessage(build, new NettyDataBuffer(byteBuf)), this.mDataTimeoutMs);
            } else {
                this.mStream.send(build, this.mDataTimeoutMs);
            }
        } finally {
            byteBuf.release();
        }
    }

    public void writeFallbackInitRequest(long j) throws IOException {
        Preconditions.checkState(this.mPartialRequest.getType() == RequestType.UFS_FALLBACK_BLOCK);
        WriteRequest build = WriteRequest.newBuilder().setCommand(this.mPartialRequest.toBuilder().setOffset(0L).setCreateUfsBlockOptions(this.mPartialRequest.getCreateUfsBlockOptions().toBuilder().setBytesInBlockStore(j).build())).build();
        this.mPosToQueue = j;
        this.mStream.send(build, this.mDataTimeoutMs);
    }

    @Override // alluxio.client.Cancelable
    public void cancel() {
        if (this.mClient.get().isShutdown()) {
            return;
        }
        this.mStream.cancel();
    }

    @Override // alluxio.client.block.stream.DataWriter
    public void flush() throws IOException {
        WriteResponse receive;
        if (this.mStream.isClosed() || this.mStream.isCanceled() || this.mPosToQueue == 0) {
            return;
        }
        WriteRequest build = WriteRequest.newBuilder().setCommand(this.mPartialRequest.toBuilder().setOffset(this.mPosToQueue).setFlush(true)).build();
        this.mStream.send(build, this.mDataTimeoutMs);
        do {
            receive = this.mStream.receive(this.mWriterFlushTimeoutMs);
            if (receive == null) {
                throw new UnavailableException(String.format("Flush request %s to worker %s is not acked before complete.", build, this.mAddress));
            }
        } while (this.mPosToQueue != receive.getOffset());
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            if (this.mClient.get().isShutdown()) {
                return;
            }
            this.mStream.close();
            this.mStream.waitForComplete(this.mWriterCloseTimeoutMs);
        } finally {
            this.mClient.close();
        }
    }

    @Override // alluxio.client.block.stream.DataWriter
    public int chunkSize() {
        return (int) this.mChunkSize;
    }
}
