package alluxio.worker.grpc;

import alluxio.annotation.SuppressFBWarnings;
import alluxio.conf.Configuration;
import alluxio.exception.runtime.ResourceExhaustedRuntimeException;
import alluxio.grpc.WriteRequestCommand;
import alluxio.grpc.WriteResponse;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.proto.dataserver.Protocol;
import alluxio.resource.CloseableResource;
import alluxio.security.authentication.AuthenticatedUserInfo;
import alluxio.underfs.UfsManager;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.CreateOptions;
import alluxio.worker.BlockUtils;
import alluxio.worker.block.CreateBlockOptions;
import alluxio.worker.block.DefaultBlockWorker;
import alluxio.worker.block.meta.TempBlockMeta;
import com.google.common.base.Preconditions;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Optional;
import javax.annotation.concurrent.NotThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SuppressFBWarnings(value = {"BC_UNCONFIRMED_CAST_OF_RETURN_VALUE"}, justification = "false positive with superclass generics, see more description in https://sourceforge.net/p/findbugs/bugs/1242/")
@NotThreadSafe
/* loaded from: input_file:alluxio/worker/grpc/UfsFallbackBlockWriteHandler.class */
public final class UfsFallbackBlockWriteHandler extends AbstractWriteHandler<BlockWriteRequestContext> {
    private static final Logger LOG = LoggerFactory.getLogger(UfsFallbackBlockWriteHandler.class);
    private final DefaultBlockWorker mWorker;
    private final UfsManager mUfsManager;
    private final BlockWriteHandler mBlockWriteHandler;
    private final boolean mDomainSocketEnabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public UfsFallbackBlockWriteHandler(DefaultBlockWorker defaultBlockWorker, UfsManager ufsManager, StreamObserver<WriteResponse> streamObserver, AuthenticatedUserInfo authenticatedUserInfo, boolean z) {
        super(streamObserver, authenticatedUserInfo);
        this.mWorker = defaultBlockWorker;
        this.mUfsManager = ufsManager;
        this.mBlockWriteHandler = new BlockWriteHandler(defaultBlockWorker, streamObserver, authenticatedUserInfo, z);
        this.mDomainSocketEnabled = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // alluxio.worker.grpc.AbstractWriteHandler
    public BlockWriteRequestContext createRequestContext(alluxio.grpc.WriteRequest writeRequest) throws Exception {
        BlockWriteRequestContext blockWriteRequestContext = new BlockWriteRequestContext(writeRequest, AbstractWriteHandler.FILE_BUFFER_SIZE);
        if (this.mDomainSocketEnabled) {
            blockWriteRequestContext.setCounter(MetricsSystem.counter(MetricKey.WORKER_BYTES_WRITTEN_DOMAIN.getName()));
            blockWriteRequestContext.setMeter(MetricsSystem.meter(MetricKey.WORKER_BYTES_WRITTEN_DOMAIN_THROUGHPUT.getName()));
        } else {
            blockWriteRequestContext.setCounter(MetricsSystem.counter(MetricKey.WORKER_BYTES_WRITTEN_REMOTE.getName()));
            blockWriteRequestContext.setMeter(MetricsSystem.meter(MetricKey.WORKER_BYTES_WRITTEN_REMOTE_THROUGHPUT.getName()));
        }
        BlockWriteRequest request = blockWriteRequestContext.getRequest();
        Preconditions.checkState(request.hasCreateUfsBlockOptions());
        blockWriteRequestContext.setWritingToLocal(!request.getCreateUfsBlockOptions().getFallback());
        if (blockWriteRequestContext.isWritingToLocal()) {
            this.mWorker.createBlock(request.getSessionId(), request.getId(), request.getTier(), new CreateBlockOptions((String) null, request.getMediumType(), AbstractWriteHandler.FILE_BUFFER_SIZE));
        }
        return blockWriteRequestContext;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // alluxio.worker.grpc.AbstractWriteHandler
    public void completeRequest(BlockWriteRequestContext blockWriteRequestContext) throws Exception {
        if (blockWriteRequestContext.isWritingToLocal()) {
            this.mBlockWriteHandler.completeRequest(blockWriteRequestContext);
        } else {
            this.mWorker.commitBlockInUfs(blockWriteRequestContext.getRequest().getId(), blockWriteRequestContext.getPos());
            if (blockWriteRequestContext.getOutputStream() != null) {
                blockWriteRequestContext.getOutputStream().close();
                blockWriteRequestContext.setOutputStream(null);
            }
        }
        if (blockWriteRequestContext.getUfsResource() != null) {
            blockWriteRequestContext.getUfsResource().close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // alluxio.worker.grpc.AbstractWriteHandler
    public void cancelRequest(BlockWriteRequestContext blockWriteRequestContext) throws Exception {
        if (blockWriteRequestContext.isWritingToLocal()) {
            this.mBlockWriteHandler.cancelRequest(blockWriteRequestContext);
        } else {
            if (blockWriteRequestContext.getOutputStream() != null) {
                blockWriteRequestContext.getOutputStream().close();
                blockWriteRequestContext.setOutputStream(null);
            }
            if (blockWriteRequestContext.getUfsResource() != null) {
                ((UnderFileSystem) blockWriteRequestContext.getUfsResource().get()).deleteExistingFile(blockWriteRequestContext.getUfsPath());
            }
        }
        if (blockWriteRequestContext.getUfsResource() != null) {
            blockWriteRequestContext.getUfsResource().close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // alluxio.worker.grpc.AbstractWriteHandler
    public void cleanupRequest(BlockWriteRequestContext blockWriteRequestContext) throws Exception {
        if (blockWriteRequestContext.isWritingToLocal()) {
            this.mBlockWriteHandler.cleanupRequest(blockWriteRequestContext);
        } else {
            cancelRequest(blockWriteRequestContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // alluxio.worker.grpc.AbstractWriteHandler
    public void flushRequest(BlockWriteRequestContext blockWriteRequestContext) throws Exception {
        if (blockWriteRequestContext.isWritingToLocal()) {
            this.mBlockWriteHandler.flushRequest(blockWriteRequestContext);
        } else if (blockWriteRequestContext.getOutputStream() != null) {
            blockWriteRequestContext.getOutputStream().flush();
        }
    }

    /* renamed from: writeBuf, reason: avoid collision after fix types in other method */
    protected void writeBuf2(BlockWriteRequestContext blockWriteRequestContext, StreamObserver<WriteResponse> streamObserver, DataBuffer dataBuffer, long j) throws Exception {
        if (blockWriteRequestContext.isWritingToLocal()) {
            long readableBytes = j - dataBuffer.readableBytes();
            try {
                this.mBlockWriteHandler.writeBuf2(blockWriteRequestContext, streamObserver, dataBuffer, j);
                return;
            } catch (ResourceExhaustedRuntimeException e) {
                LOG.warn("Not enough space to write block {} to local worker, fallback to UFS.  {} bytes have been written.", Long.valueOf(blockWriteRequestContext.getRequest().getId()), Long.valueOf(readableBytes));
                blockWriteRequestContext.setWritingToLocal(false);
                if (blockWriteRequestContext.getBlockWriter() != null) {
                    blockWriteRequestContext.getBlockWriter().close();
                }
                createUfsBlock(blockWriteRequestContext);
                if (readableBytes > 0) {
                    transferToUfsBlock(blockWriteRequestContext, readableBytes);
                }
                this.mBlockWriteHandler.cancelRequest(blockWriteRequestContext);
            }
        }
        if (blockWriteRequestContext.getOutputStream() == null) {
            createUfsBlock(blockWriteRequestContext);
        }
        dataBuffer.readBytes(blockWriteRequestContext.getOutputStream(), dataBuffer.readableBytes());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // alluxio.worker.grpc.AbstractWriteHandler
    public void handleCommand(WriteRequestCommand writeRequestCommand, BlockWriteRequestContext blockWriteRequestContext) throws Exception {
        if (writeRequestCommand.hasCreateUfsBlockOptions() && writeRequestCommand.getOffset() == 0 && writeRequestCommand.getCreateUfsBlockOptions().hasBytesInBlockStore()) {
            blockWriteRequestContext.setPos(blockWriteRequestContext.getPos() + writeRequestCommand.getCreateUfsBlockOptions().getBytesInBlockStore());
            initUfsFallback(blockWriteRequestContext);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // alluxio.worker.grpc.AbstractWriteHandler
    public String getLocationInternal(BlockWriteRequestContext blockWriteRequestContext) {
        Protocol.CreateUfsBlockOptions createUfsBlockOptions = blockWriteRequestContext.getRequest().getCreateUfsBlockOptions();
        if (createUfsBlockOptions == null) {
            return String.format("blockId-%d", Long.valueOf(blockWriteRequestContext.getRequest().getId()));
        }
        try {
            return BlockUtils.getUfsBlockPath(this.mUfsManager.get(createUfsBlockOptions.getMountId()), blockWriteRequestContext.getRequest().getId());
        } catch (Throwable th) {
            return String.format("blockId-%d", Long.valueOf(blockWriteRequestContext.getRequest().getId()));
        }
    }

    private void initUfsFallback(BlockWriteRequestContext blockWriteRequestContext) throws Exception {
        Preconditions.checkState(!blockWriteRequestContext.isWritingToLocal());
        if (blockWriteRequestContext.getOutputStream() == null) {
            createUfsBlock(blockWriteRequestContext);
        }
        transferToUfsBlock(blockWriteRequestContext, blockWriteRequestContext.getPos());
    }

    private void createUfsBlock(BlockWriteRequestContext blockWriteRequestContext) throws Exception {
        BlockWriteRequest request = blockWriteRequestContext.getRequest();
        UfsManager.UfsClient ufsClient = this.mUfsManager.get(request.getCreateUfsBlockOptions().getMountId());
        CloseableResource<UnderFileSystem> acquireUfsResource = ufsClient.acquireUfsResource();
        blockWriteRequestContext.setUfsResource(acquireUfsResource);
        String escape = MetricsSystem.escape(ufsClient.getUfsMountPointUri());
        String ufsBlockPath = BlockUtils.getUfsBlockPath(ufsClient, request.getId());
        blockWriteRequestContext.setOutputStream(((UnderFileSystem) acquireUfsResource.get()).createNonexistingFile(ufsBlockPath, CreateOptions.defaults(Configuration.global()).setEnsureAtomic(true).setCreateParent(true)));
        blockWriteRequestContext.setUfsPath(ufsBlockPath);
        MetricKey metricKey = MetricKey.WORKER_BYTES_WRITTEN_UFS;
        MetricKey metricKey2 = MetricKey.WORKER_BYTES_WRITTEN_UFS_THROUGHPUT;
        blockWriteRequestContext.setCounter(MetricsSystem.counterWithTags(metricKey.getName(), metricKey.isClusterAggregated(), new String[]{"UFS", escape}));
        blockWriteRequestContext.setMeter(MetricsSystem.meterWithTags(metricKey2.getName(), metricKey2.isClusterAggregated(), new String[]{"UFS", escape}));
    }

    private void transferToUfsBlock(BlockWriteRequestContext blockWriteRequestContext, long j) throws IOException {
        OutputStream outputStream = blockWriteRequestContext.getOutputStream();
        Optional tempBlockMeta = this.mWorker.getBlockStore().getTempBlockMeta(blockWriteRequestContext.getRequest().getId());
        Preconditions.checkState(tempBlockMeta.isPresent() && Files.copy(Paths.get(((TempBlockMeta) tempBlockMeta.get()).getPath(), new String[0]), outputStream) == j);
    }

    @Override // alluxio.worker.grpc.AbstractWriteHandler
    public /* bridge */ /* synthetic */ String getLocation() {
        return super.getLocation();
    }

    @Override // alluxio.worker.grpc.AbstractWriteHandler
    protected /* bridge */ /* synthetic */ void writeBuf(BlockWriteRequestContext blockWriteRequestContext, StreamObserver streamObserver, DataBuffer dataBuffer, long j) throws Exception {
        writeBuf2(blockWriteRequestContext, (StreamObserver<WriteResponse>) streamObserver, dataBuffer, j);
    }

    @Override // alluxio.worker.grpc.AbstractWriteHandler
    public /* bridge */ /* synthetic */ void onError(Throwable th) {
        super.onError(th);
    }

    @Override // alluxio.worker.grpc.AbstractWriteHandler
    public /* bridge */ /* synthetic */ void onCancel() {
        super.onCancel();
    }

    @Override // alluxio.worker.grpc.AbstractWriteHandler
    public /* bridge */ /* synthetic */ void onCompleted() {
        super.onCompleted();
    }

    @Override // alluxio.worker.grpc.AbstractWriteHandler
    public /* bridge */ /* synthetic */ void writeDataMessage(alluxio.grpc.WriteRequest writeRequest, DataBuffer dataBuffer) {
        super.writeDataMessage(writeRequest, dataBuffer);
    }

    @Override // alluxio.worker.grpc.AbstractWriteHandler
    public /* bridge */ /* synthetic */ void write(alluxio.grpc.WriteRequest writeRequest) {
        super.write(writeRequest);
    }
}
