package alluxio.worker.block;

import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.util.ThreadFactoryUtils;
import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:alluxio/worker/block/AsyncBlockRemover.class */
public class AsyncBlockRemover {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncBlockRemover.class);
    private static final int DEFAULT_BLOCK_REMOVER_POOL_SIZE = 10;
    private final BlockWorker mBlockWorker;
    private final BlockingQueue<Long> mBlocksToRemove;
    private final Set<Long> mRemovingBlocks;
    private final ExecutorService mRemoverPool;
    private final Counter mTryRemoveCount;
    private final Counter mRemovedCount;
    private volatile boolean mShutdown;

    /* loaded from: input_file:alluxio/worker/block/AsyncBlockRemover$BlockRemover.class */
    private class BlockRemover implements Runnable {
        private BlockRemover() {
        }

        @Override // java.lang.Runnable
        public void run() {
            String name = Thread.currentThread().getName();
            while (true) {
                Long l = null;
                try {
                    try {
                        l = (Long) AsyncBlockRemover.this.mBlocksToRemove.take();
                        AsyncBlockRemover.this.mTryRemoveCount.inc();
                        AsyncBlockRemover.this.mBlockWorker.removeBlock(-4L, l.longValue());
                        AsyncBlockRemover.this.mRemovedCount.inc();
                        AsyncBlockRemover.LOG.debug("Block {} is removed in thread {}.", l, name);
                        if (l != null) {
                            AsyncBlockRemover.this.mRemovingBlocks.remove(l);
                        }
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        if (!AsyncBlockRemover.this.mShutdown) {
                            AsyncBlockRemover.LOG.warn("{} got interrupted while it was cleaning block {}.", name, l);
                        }
                        if (l != null) {
                            AsyncBlockRemover.this.mRemovingBlocks.remove(l);
                            return;
                        }
                        return;
                    } catch (Exception e2) {
                        AsyncBlockRemover.LOG.warn("Failed to remove block {} instructed by master. This is best-effort and will be tried later. threadName {}, error {}", new Object[]{l, name, e2.getMessage()});
                        if (l != null) {
                            AsyncBlockRemover.this.mRemovingBlocks.remove(l);
                        }
                    }
                } catch (Throwable th) {
                    if (l != null) {
                        AsyncBlockRemover.this.mRemovingBlocks.remove(l);
                    }
                    throw th;
                }
            }
        }
    }

    public AsyncBlockRemover(BlockWorker blockWorker) {
        this(blockWorker, DEFAULT_BLOCK_REMOVER_POOL_SIZE, new LinkedBlockingQueue(), Collections.newSetFromMap(new ConcurrentHashMap()));
    }

    @VisibleForTesting
    public AsyncBlockRemover(BlockWorker blockWorker, int i, BlockingQueue<Long> blockingQueue, Set<Long> set) {
        this.mShutdown = false;
        this.mBlockWorker = blockWorker;
        this.mBlocksToRemove = blockingQueue;
        this.mRemovingBlocks = set;
        this.mTryRemoveCount = MetricsSystem.counter(MetricKey.WORKER_BLOCK_REMOVER_TRY_REMOVE_COUNT.getName());
        this.mRemovedCount = MetricsSystem.counter(MetricKey.WORKER_BLOCK_REMOVER_REMOVED_COUNT.getName());
        String name = MetricKey.WORKER_BLOCK_REMOVER_TRY_REMOVE_BLOCKS_SIZE.getName();
        BlockingQueue<Long> blockingQueue2 = this.mBlocksToRemove;
        blockingQueue2.getClass();
        MetricsSystem.registerGaugeIfAbsent(name, blockingQueue2::size);
        String name2 = MetricKey.WORKER_BLOCK_REMOVER_REMOVING_BLOCKS_SIZE.getName();
        Set<Long> set2 = this.mRemovingBlocks;
        set2.getClass();
        MetricsSystem.registerGaugeIfAbsent(name2, set2::size);
        this.mRemoverPool = Executors.newFixedThreadPool(i, ThreadFactoryUtils.build("block-removal-service-%d", true));
        for (int i2 = 0; i2 < i; i2++) {
            this.mRemoverPool.execute(new BlockRemover());
        }
    }

    public void addBlocksToDelete(List<Long> list) {
        Iterator<Long> it = list.iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            if (this.mRemovingBlocks.contains(Long.valueOf(longValue))) {
                LOG.debug("{} is being removed. Current queue size is {}.", Long.valueOf(longValue), Integer.valueOf(this.mBlocksToRemove.size()));
            } else {
                try {
                    this.mBlocksToRemove.put(Long.valueOf(longValue));
                    this.mRemovingBlocks.add(Long.valueOf(longValue));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    LOG.warn("AsyncBlockRemover got interrupted while it was putting block {}.", Long.valueOf(longValue));
                }
            }
        }
    }

    public void shutDown() {
        this.mShutdown = true;
        this.mRemoverPool.shutdownNow();
    }
}
