package alluxio.worker.block;

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.exception.runtime.BlockDoesNotExistRuntimeException;
import alluxio.exception.runtime.DeadlineExceededRuntimeException;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.exception.status.NotFoundException;
import alluxio.exception.status.UnavailableException;
import alluxio.grpc.Block;
import alluxio.grpc.BlockStatus;
import alluxio.grpc.UfsReadOptions;
import alluxio.network.protocol.databuffer.NioDirectBufferPool;
import alluxio.proto.dataserver.Protocol;
import alluxio.retry.ExponentialBackoffRetry;
import alluxio.retry.RetryUtils;
import alluxio.underfs.UfsManager;
import alluxio.util.ThreadFactoryUtils;
import alluxio.worker.block.DefaultBlockWorker;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import alluxio.worker.block.io.DelegatingBlockReader;
import alluxio.worker.block.meta.BlockMeta;
import alluxio.worker.block.meta.TempBlockMeta;
import alluxio.worker.grpc.GrpcExecutors;
import com.google.common.base.Strings;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/worker/block/MonoBlockStore.class */
public class MonoBlockStore implements BlockStore {
    private static final Logger LOG = LoggerFactory.getLogger(MonoBlockStore.class);
    private static final long LOAD_TIMEOUT = Configuration.getMs(PropertyKey.USER_NETWORK_RPC_KEEPALIVE_TIMEOUT);
    private final LocalBlockStore mLocalBlockStore;
    private final UnderFileSystemBlockStore mUnderFileSystemBlockStore;
    private final BlockMasterClientPool mBlockMasterClientPool;
    private final AtomicReference<Long> mWorkerId;
    private final List<BlockStoreEventListener> mBlockStoreEventListeners = new CopyOnWriteArrayList();
    private final ScheduledExecutorService mDelayer = new ScheduledThreadPoolExecutor(1, ThreadFactoryUtils.build("LoadTimeOut", true));

    public MonoBlockStore(LocalBlockStore localBlockStore, BlockMasterClientPool blockMasterClientPool, UfsManager ufsManager, AtomicReference<Long> atomicReference) {
        this.mLocalBlockStore = (LocalBlockStore) Objects.requireNonNull(localBlockStore);
        this.mBlockMasterClientPool = (BlockMasterClientPool) Objects.requireNonNull(blockMasterClientPool);
        this.mUnderFileSystemBlockStore = new UnderFileSystemBlockStore(localBlockStore, (UfsManager) Objects.requireNonNull(ufsManager));
        this.mWorkerId = atomicReference;
    }

    public void abortBlock(long j, long j2) {
        this.mLocalBlockStore.abortBlock(j, j2);
    }

    public void accessBlock(long j, long j2) {
        this.mLocalBlockStore.accessBlock(j, j2);
    }

    public void cleanupSession(long j) {
        this.mLocalBlockStore.cleanupSession(j);
    }

    public void commitBlock(long j, long j2, boolean z) {
        BlockMasterClient blockMasterClient = (BlockMasterClient) this.mBlockMasterClientPool.acquire();
        try {
            try {
                BlockLock commitBlockLocked = this.mLocalBlockStore.commitBlockLocked(j, j2, z);
                Throwable th = null;
                try {
                    try {
                        BlockMeta blockMeta = this.mLocalBlockStore.getVolatileBlockMeta(j2).get();
                        BlockStoreLocation blockLocation = blockMeta.getBlockLocation();
                        blockMasterClient.commitBlock(this.mWorkerId.get().longValue(), ((Long) this.mLocalBlockStore.getBlockStoreMeta().getUsedBytesOnTiers().get(blockLocation.tierAlias())).longValue(), blockLocation.tierAlias(), blockLocation.mediumType(), j2, blockMeta.getBlockSize());
                        for (BlockStoreEventListener blockStoreEventListener : this.mBlockStoreEventListeners) {
                            synchronized (blockStoreEventListener) {
                                blockStoreEventListener.onCommitBlockToMaster(j2, blockLocation);
                            }
                        }
                        if (commitBlockLocked != null) {
                            if (0 != 0) {
                                try {
                                    commitBlockLocked.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                commitBlockLocked.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (commitBlockLocked != null) {
                        if (th != null) {
                            try {
                                commitBlockLocked.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            commitBlockLocked.close();
                        }
                    }
                    throw th3;
                }
            } catch (AlluxioStatusException e) {
                throw AlluxioRuntimeException.from(e);
            }
        } finally {
            this.mBlockMasterClientPool.release(blockMasterClient);
            DefaultBlockWorker.Metrics.WORKER_ACTIVE_CLIENTS.dec();
        }
    }

    public String createBlock(long j, long j2, int i, CreateBlockOptions createBlockOptions) {
        TempBlockMeta createBlock = this.mLocalBlockStore.createBlock(j, j2, AllocateOptions.forCreate(createBlockOptions.getInitialBytes(), Strings.isNullOrEmpty(createBlockOptions.getMedium()) ? BlockStoreLocation.anyDirInTier(BlockMetadataManager.WORKER_STORAGE_TIER_ASSOC.getAlias(i)) : BlockStoreLocation.anyDirInAnyTierWithMedium(createBlockOptions.getMedium())));
        DefaultBlockWorker.Metrics.WORKER_ACTIVE_CLIENTS.inc();
        return createBlock.getPath();
    }

    public BlockReader createBlockReader(long j, long j2, long j3, boolean z, Protocol.OpenUfsBlockOptions openUfsBlockOptions) throws IOException {
        BlockReader createUfsBlockReader;
        if (this.mLocalBlockStore.getVolatileBlockMeta(j2).isPresent()) {
            createUfsBlockReader = this.mLocalBlockStore.createBlockReader(j, j2, j3);
            DefaultBlockWorker.Metrics.WORKER_ACTIVE_CLIENTS.inc();
        } else {
            if (!(openUfsBlockOptions != null && (openUfsBlockOptions.hasUfsPath() || openUfsBlockOptions.getBlockInUfsTier()))) {
                throw new BlockDoesNotExistRuntimeException(j2);
            }
            createUfsBlockReader = createUfsBlockReader(j, j2, j3, z, openUfsBlockOptions);
        }
        return createUfsBlockReader;
    }

    public BlockReader createUfsBlockReader(long j, long j2, long j3, boolean z, Protocol.OpenUfsBlockOptions openUfsBlockOptions) throws IOException {
        try {
            DelegatingBlockReader delegatingBlockReader = new DelegatingBlockReader(this.mUnderFileSystemBlockStore.createBlockReader(j, j2, j3, z, openUfsBlockOptions), () -> {
                closeUfsBlock(j, j2);
            });
            DefaultBlockWorker.Metrics.WORKER_ACTIVE_CLIENTS.inc();
            return delegatingBlockReader;
        } catch (Exception e) {
            try {
                closeUfsBlock(j, j2);
            } catch (Exception e2) {
                LOG.warn("Failed to close UFS block", e2);
            }
            String format = String.format("Failed to read from UFS, sessionId=%d, blockId=%d, offset=%d, positionShort=%s, options=%s: %s", Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3), Boolean.valueOf(z), openUfsBlockOptions, e);
            if (e instanceof FileNotFoundException) {
                throw new NotFoundException(format, e);
            }
            throw new UnavailableException(format, e);
        }
    }

    private void closeUfsBlock(long j, long j2) throws IOException {
        try {
            this.mUnderFileSystemBlockStore.closeBlock(j, j2);
            Optional<TempBlockMeta> tempBlockMeta = this.mLocalBlockStore.getTempBlockMeta(j2);
            if (tempBlockMeta.isPresent() && tempBlockMeta.get().getSessionId() == j) {
                commitBlock(j, j2, false);
            } else if (this.mUnderFileSystemBlockStore.isNoCache(j, j2)) {
                DefaultBlockWorker.Metrics.WORKER_ACTIVE_CLIENTS.dec();
            }
        } finally {
            this.mUnderFileSystemBlockStore.releaseAccess(j, j2);
        }
    }

    public BlockWriter createBlockWriter(long j, long j2) throws IOException {
        return this.mLocalBlockStore.createBlockWriter(j, j2);
    }

    public BlockStoreMeta getBlockStoreMeta() {
        return this.mLocalBlockStore.getBlockStoreMeta();
    }

    public BlockStoreMeta getBlockStoreMetaFull() {
        return this.mLocalBlockStore.getBlockStoreMetaFull();
    }

    public Optional<TempBlockMeta> getTempBlockMeta(long j) {
        return this.mLocalBlockStore.getTempBlockMeta(j);
    }

    public boolean hasBlockMeta(long j) {
        return this.mLocalBlockStore.hasBlockMeta(j);
    }

    public boolean hasTempBlockMeta(long j) {
        return this.mLocalBlockStore.hasTempBlockMeta(j);
    }

    public Optional<BlockMeta> getVolatileBlockMeta(long j) {
        return this.mLocalBlockStore.getVolatileBlockMeta(j);
    }

    public void moveBlock(long j, long j2, AllocateOptions allocateOptions) throws IOException {
        this.mLocalBlockStore.moveBlock(j, j2, allocateOptions);
    }

    public Optional<BlockLock> pinBlock(long j, long j2) {
        return this.mLocalBlockStore.pinBlock(j, j2);
    }

    public void unpinBlock(BlockLock blockLock) {
        blockLock.close();
    }

    public void updatePinnedInodes(Set<Long> set) {
        this.mLocalBlockStore.updatePinnedInodes(set);
    }

    public void registerBlockStoreEventListener(BlockStoreEventListener blockStoreEventListener) {
        LOG.debug("registerBlockStoreEventListener: listener={}", blockStoreEventListener);
        this.mBlockStoreEventListeners.add(blockStoreEventListener);
        this.mLocalBlockStore.registerBlockStoreEventListener(blockStoreEventListener);
    }

    public void removeBlock(long j, long j2) throws IOException {
        this.mLocalBlockStore.removeBlock(j, j2);
    }

    public void removeInaccessibleStorage() {
        this.mLocalBlockStore.removeInaccessibleStorage();
    }

    public void requestSpace(long j, long j2, long j3) {
        this.mLocalBlockStore.requestSpace(j, j2, j3);
    }

    public CompletableFuture<List<BlockStatus>> load(List<Block> list, UfsReadOptions ufsReadOptions) {
        ArrayList arrayList = new ArrayList();
        List<BlockStatus> synchronizedList = Collections.synchronizedList(new ArrayList());
        long j = -9;
        for (Block block : list) {
            long blockId = block.getBlockId();
            long length = block.getLength();
            BlockStoreLocation anyDirInTier = BlockStoreLocation.anyDirInTier(BlockMetadataManager.WORKER_STORAGE_TIER_ASSOC.getAlias(0));
            try {
                UfsIOManager orAddUfsIOManager = this.mUnderFileSystemBlockStore.getOrAddUfsIOManager(block.getMountId());
                if (ufsReadOptions.hasBandwidth()) {
                    orAddUfsIOManager.setQuota(ufsReadOptions.getTag(), ufsReadOptions.getBandwidth());
                }
                this.mLocalBlockStore.createBlock(-9L, blockId, AllocateOptions.forCreate(length, anyDirInTier));
                BlockWriter createBlockWriter = this.mLocalBlockStore.createBlockWriter(-9L, blockId);
                ByteBuffer acquire = NioDirectBufferPool.acquire((int) length);
                arrayList.add(((CompletableFuture) RetryUtils.retryCallable("read from ufs", () -> {
                    return orAddUfsIOManager.read(acquire, block.getOffsetInFile(), length, blockId, block.getUfsPath(), ufsReadOptions);
                }, new ExponentialBackoffRetry(1000, 5000, 5))).applyToEither((CompletionStage) timeoutAfter(LOAD_TIMEOUT, TimeUnit.MILLISECONDS), num -> {
                    return num;
                }).thenRunAsync(() -> {
                    acquire.flip();
                    createBlockWriter.append(acquire);
                }, (Executor) GrpcExecutors.BLOCK_WRITER_EXECUTOR).thenRun(() -> {
                    try {
                        try {
                            createBlockWriter.close();
                            NioDirectBufferPool.release(acquire);
                        } catch (IOException e) {
                            throw AlluxioRuntimeException.from(e);
                        }
                    } catch (Throwable th) {
                        NioDirectBufferPool.release(acquire);
                        throw th;
                    }
                }).thenRun(() -> {
                    commitBlock(j, blockId, false);
                }).exceptionally(th -> {
                    handleException(th.getCause(), block, synchronizedList, j);
                    return null;
                }));
            } catch (Exception e) {
                handleException(e, block, synchronizedList, -9L);
            }
        }
        return CompletableFuture.allOf((CompletableFuture[]) arrayList.toArray(new CompletableFuture[0])).thenApply(r3 -> {
            return synchronizedList;
        });
    }

    private void handleException(Throwable th, Block block, List<BlockStatus> list, long j) {
        LOG.warn("Load block failure: {}", block, th);
        AlluxioRuntimeException from = AlluxioRuntimeException.from(th);
        BlockStatus.Builder retryable = BlockStatus.newBuilder().setBlock(block).setCode(from.getStatus().getCode().value()).setRetryable(from.isRetryable());
        if (from.getMessage() != null) {
            retryable.setMessage(from.getMessage());
        }
        list.add(retryable.build());
        if (hasTempBlockMeta(block.getBlockId())) {
            try {
                abortBlock(j, block.getBlockId());
            } catch (Exception e) {
                LOG.warn(String.format("fail to abort temp block %s after failing to load block", Long.valueOf(block.getBlockId())), e);
            }
        }
    }

    private <T> CompletableFuture<T> timeoutAfter(long j, TimeUnit timeUnit) {
        CompletableFuture<T> completableFuture = new CompletableFuture<>();
        this.mDelayer.schedule(() -> {
            return Boolean.valueOf(completableFuture.completeExceptionally(new DeadlineExceededRuntimeException(String.format("time out after waiting for %s %s", Long.valueOf(j), timeUnit))));
        }, j, timeUnit);
        return completableFuture;
    }

    public void close() throws IOException {
        this.mLocalBlockStore.close();
        this.mUnderFileSystemBlockStore.close();
    }
}
