package alluxio.worker.grpc;

import alluxio.RpcSensitiveConfigMask;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.InvalidArgumentException;
import alluxio.grpc.WriteRequestCommand;
import alluxio.grpc.WriteResponse;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.NioDataBuffer;
import alluxio.security.authentication.AuthenticatedUserInfo;
import alluxio.util.LogUtils;
import alluxio.util.logging.SamplingLogger;
import alluxio.worker.grpc.WriteRequestContext;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.SerializingExecutor;
import io.grpc.stub.StreamObserver;
import java.util.concurrent.Semaphore;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:alluxio/worker/grpc/AbstractWriteHandler.class */
abstract class AbstractWriteHandler<T extends WriteRequestContext<?>> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractWriteHandler.class);
    private static final Logger SLOW_WRITE_LOG = new SamplingLogger(LOG, 300000);
    private static final long SLOW_WRITE_MS = Configuration.getMs(PropertyKey.WORKER_REMOTE_IO_SLOW_THRESHOLD);
    public static final long FILE_BUFFER_SIZE = 1048576;
    private final StreamObserver<WriteResponse> mResponseObserver;
    private volatile T mContext;
    protected AuthenticatedUserInfo mUserInfo;
    private final Semaphore mSemaphore = new Semaphore(Configuration.getInt(PropertyKey.WORKER_NETWORK_WRITER_BUFFER_SIZE_MESSAGES), true);
    private final SerializingExecutor mSerializingExecutor = new SerializingExecutor(GrpcExecutors.BLOCK_WRITER_EXECUTOR);

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractWriteHandler(StreamObserver<WriteResponse> streamObserver, AuthenticatedUserInfo authenticatedUserInfo) {
        this.mResponseObserver = streamObserver;
        this.mUserInfo = authenticatedUserInfo;
    }

    public void write(alluxio.grpc.WriteRequest writeRequest) {
        if (tryAcquireSemaphore()) {
            this.mSerializingExecutor.execute(() -> {
                try {
                    if (this.mContext == null) {
                        LOG.debug("Received write request {}.", RpcSensitiveConfigMask.CREDENTIAL_FIELD_MASKER.maskObjects(LOG, new Object[]{writeRequest}));
                        try {
                            this.mContext = createRequestContext(writeRequest);
                        } catch (Exception e) {
                            replyError(new Error(AlluxioStatusException.fromThrowable(e), true));
                            throw e;
                        }
                    } else {
                        Preconditions.checkState(!this.mContext.isDoneUnsafe(), "invalid request after write request is completed.");
                    }
                    if (this.mContext.isDoneUnsafe() || this.mContext.getError() != null) {
                        return;
                    }
                    validateWriteRequest(writeRequest);
                    if (writeRequest.hasCommand()) {
                        WriteRequestCommand command = writeRequest.getCommand();
                        if (command.getFlush()) {
                            flush();
                        } else {
                            handleCommand(command, this.mContext);
                        }
                    } else {
                        Preconditions.checkState(writeRequest.hasChunk(), "write request is missing data chunk in non-command message");
                        ByteString data = writeRequest.getChunk().getData();
                        Preconditions.checkState(data != null && data.size() > 0, "invalid data size from write request message");
                        writeData(new NioDataBuffer(data.asReadOnlyByteBuffer(), data.size()));
                    }
                } catch (Exception e2) {
                    LogUtils.warnWithException(LOG, "Exception occurred while processing write request {}.", new Object[]{writeRequest, e2});
                    abort(new Error(AlluxioStatusException.fromThrowable(e2), true));
                } finally {
                    this.mSemaphore.release();
                }
            });
        }
    }

    public void writeDataMessage(alluxio.grpc.WriteRequest writeRequest, DataBuffer dataBuffer) {
        if (dataBuffer == null) {
            write(writeRequest);
            return;
        }
        boolean z = true;
        try {
            Preconditions.checkState(!writeRequest.hasCommand(), "write request command should not come with data buffer");
            Preconditions.checkState(dataBuffer.readableBytes() > 0, "invalid data size from write request message");
            if (!tryAcquireSemaphore()) {
                if (1 != 0) {
                    dataBuffer.release();
                }
            } else {
                z = false;
                this.mSerializingExecutor.execute(() -> {
                    try {
                        writeData(dataBuffer);
                    } finally {
                        this.mSemaphore.release();
                    }
                });
                if (0 != 0) {
                    dataBuffer.release();
                }
            }
        } catch (Throwable th) {
            if (z) {
                dataBuffer.release();
            }
            throw th;
        }
    }

    public void onCompleted() {
        this.mSerializingExecutor.execute(() -> {
            Preconditions.checkState(this.mContext != null);
            try {
                completeRequest(this.mContext);
                replySuccess();
            } catch (Exception e) {
                LogUtils.warnWithException(LOG, "Exception occurred while completing write request {}.", new Object[]{this.mContext.getRequest(), e});
                abort(new Error(AlluxioStatusException.fromThrowable(e), true));
            }
        });
    }

    public void onCancel() {
        this.mSerializingExecutor.execute(() -> {
            try {
                cancelRequest(this.mContext);
                replyCancel();
            } catch (Exception e) {
                LogUtils.warnWithException(LOG, "Exception occurred while cancelling write request {}.", new Object[]{this.mContext.getRequest(), e});
                abort(new Error(AlluxioStatusException.fromThrowable(e), true));
            }
        });
    }

    public void onError(Throwable th) {
        if ((th instanceof StatusRuntimeException) && ((StatusRuntimeException) th).getStatus().getCode() == Status.Code.CANCELLED) {
            return;
        }
        this.mSerializingExecutor.execute(() -> {
            Logger logger = LOG;
            Object[] objArr = new Object[2];
            objArr[0] = this.mContext == null ? "unknown" : this.mContext.getRequest();
            objArr[1] = th;
            LogUtils.warnWithException(logger, "Exception thrown while handling write request {}", objArr);
            abort(new Error(AlluxioStatusException.fromThrowable(th), false));
        });
    }

    private boolean tryAcquireSemaphore() {
        try {
            this.mSemaphore.acquire();
            return true;
        } catch (InterruptedException e) {
            LOG.warn("write data request {} is interrupted: {}", this.mContext == null ? "unknown" : this.mContext.getRequest(), e.getMessage());
            abort(new Error(AlluxioStatusException.fromThrowable(e), true));
            Thread.currentThread().interrupt();
            return false;
        }
    }

    @GuardedBy("mLock")
    private void validateWriteRequest(alluxio.grpc.WriteRequest writeRequest) throws InvalidArgumentException {
        if (writeRequest.hasCommand() && writeRequest.getCommand().hasOffset() && writeRequest.getCommand().getOffset() != this.mContext.getPos()) {
            throw new InvalidArgumentException(String.format("Offsets do not match [received: %d, expected: %d].", Long.valueOf(writeRequest.getCommand().getOffset()), Long.valueOf(this.mContext.getPos())));
        }
    }

    private void writeData(DataBuffer dataBuffer) {
        try {
            try {
                if (this.mContext.isDoneUnsafe() || this.mContext.getError() != null) {
                    return;
                }
                int readableBytes = dataBuffer.readableBytes();
                this.mContext.setPos(this.mContext.getPos() + readableBytes);
                long currentTimeMillis = System.currentTimeMillis();
                writeBuf(this.mContext, this.mResponseObserver, dataBuffer, this.mContext.getPos());
                long currentTimeMillis2 = System.currentTimeMillis() - currentTimeMillis;
                if (currentTimeMillis2 >= SLOW_WRITE_MS) {
                    SLOW_WRITE_LOG.warn(String.format("Writing buffer for remote write took longer than %s ms. handler: %s", Long.valueOf(SLOW_WRITE_MS), getClass().getName()) + " id: {} location: {} bytes: {} durationMs: {}", new Object[]{Long.valueOf(this.mContext.getRequest().getId()), getLocation(), Integer.valueOf(readableBytes), Long.valueOf(currentTimeMillis2)});
                }
                incrementMetrics(readableBytes);
                dataBuffer.release();
            } catch (Exception e) {
                LOG.error("Failed to write data for request {}", this.mContext.getRequest(), e);
                abort(new Error(AlluxioStatusException.fromThrowable(e), true));
                dataBuffer.release();
            }
        } finally {
            dataBuffer.release();
        }
    }

    private void flush() {
        try {
            flushRequest(this.mContext);
            replyFlush();
        } catch (Exception e) {
            LOG.error("Failed to flush for write request {}", this.mContext.getRequest(), e);
            abort(new Error(AlluxioStatusException.fromThrowable(e), true));
        }
    }

    private void abort(Error error) {
        try {
            if (this.mContext == null || this.mContext.getError() != null || this.mContext.isDoneUnsafe()) {
                return;
            }
            this.mContext.setError(error);
            cleanupRequest(this.mContext);
        } catch (Exception e) {
            LOG.warn("Failed to cleanup states with error {}.", e.toString());
        } finally {
            replyError();
        }
    }

    protected abstract T createRequestContext(alluxio.grpc.WriteRequest writeRequest) throws Exception;

    protected abstract void completeRequest(T t) throws Exception;

    protected abstract void cancelRequest(T t) throws Exception;

    protected abstract void cleanupRequest(T t) throws Exception;

    protected abstract void flushRequest(T t) throws Exception;

    protected abstract void writeBuf(T t, StreamObserver<WriteResponse> streamObserver, DataBuffer dataBuffer, long j) throws Exception;

    protected abstract String getLocationInternal(T t);

    public String getLocation() {
        return this.mContext == null ? "null" : getLocationInternal(this.mContext);
    }

    protected void handleCommand(WriteRequestCommand writeRequestCommand, T t) throws Exception {
    }

    private void replySuccess() {
        this.mContext.setDoneUnsafe(true);
        this.mContext.getContentHash().ifPresent(str -> {
            this.mResponseObserver.onNext(WriteResponse.newBuilder().setContentHash(str).setOffset(this.mContext.getPos()).build());
        });
        this.mResponseObserver.onCompleted();
    }

    private void replyCancel() {
        this.mContext.setDoneUnsafe(true);
        this.mResponseObserver.onCompleted();
    }

    private void replyError() {
        replyError((Error) Preconditions.checkNotNull(this.mContext.getError()));
    }

    private void replyError(Error error) {
        if (error.isNotifyClient()) {
            this.mResponseObserver.onError(error.getCause().toGrpcStatusException());
        }
    }

    private void replyFlush() {
        this.mResponseObserver.onNext(WriteResponse.newBuilder().setOffset(this.mContext.getPos()).build());
    }

    private void incrementMetrics(long j) {
        Counter counter = this.mContext.getCounter();
        Meter meter = this.mContext.getMeter();
        Preconditions.checkState(counter != null, "counter");
        Preconditions.checkState(meter != null, "meter");
        counter.inc(j);
        meter.mark(j);
    }
}
