package alluxio.worker.block.management;

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.util.ThreadFactoryUtils;
import alluxio.worker.block.BlockMetadataEvictorView;
import alluxio.worker.block.BlockMetadataManager;
import alluxio.worker.block.BlockStoreLocation;
import alluxio.worker.block.LocalBlockStore;
import alluxio.worker.block.management.tier.TierManagementTaskProvider;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/worker/block/management/ManagementTaskCoordinator.class */
public class ManagementTaskCoordinator implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(ManagementTaskCoordinator.class);
    private static final long LOAD_DETECTION_COOL_DOWN_TIME = Configuration.getMs(PropertyKey.WORKER_MANAGEMENT_LOAD_DETECTION_COOL_DOWN_TIME);
    private static final BackoffStrategy BACKOFF_STRATEGY = Configuration.getEnum(PropertyKey.WORKER_MANAGEMENT_BACKOFF_STRATEGY, BackoffStrategy.class);
    private final Thread mRunnerThread;
    private final ExecutorService mTaskExecutor = Executors.newFixedThreadPool(Configuration.getInt(PropertyKey.WORKER_MANAGEMENT_TASK_THREAD_COUNT), ThreadFactoryUtils.build("block-management-task-%d", true));
    private final LocalBlockStore mBlockStore;
    private final BlockMetadataManager mMetadataManager;
    private final StoreLoadTracker mLoadTracker;
    private final Supplier<BlockMetadataEvictorView> mEvictionViewSupplier;
    private List<ManagementTaskProvider> mTaskProviders;

    public ManagementTaskCoordinator(LocalBlockStore localBlockStore, BlockMetadataManager blockMetadataManager, StoreLoadTracker storeLoadTracker, Supplier<BlockMetadataEvictorView> supplier) {
        this.mBlockStore = localBlockStore;
        this.mMetadataManager = blockMetadataManager;
        this.mLoadTracker = storeLoadTracker;
        this.mEvictionViewSupplier = supplier;
        initializeTaskProviders();
        this.mRunnerThread = new Thread(this::runManagement, "block-management-runner");
        this.mRunnerThread.setDaemon(true);
    }

    public void start() {
        this.mRunnerThread.start();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            this.mTaskExecutor.shutdownNow();
            this.mRunnerThread.interrupt();
            this.mRunnerThread.join();
        } catch (Exception e) {
            throw new IOException("Failed to close management task coordinator", e);
        }
    }

    private void initializeTaskProviders() {
        this.mTaskProviders = new ArrayList(1);
        if (Configuration.isSet(PropertyKey.WORKER_EVICTOR_CLASS)) {
            LOG.warn("Tier management tasks will be disabled under eviction emulation mode.");
        } else {
            this.mTaskProviders.add(new TierManagementTaskProvider(this.mBlockStore, this.mMetadataManager, this.mEvictionViewSupplier, this.mLoadTracker, this.mTaskExecutor));
        }
    }

    private BlockManagementTask getNextTask() {
        Iterator<ManagementTaskProvider> it = this.mTaskProviders.iterator();
        while (it.hasNext()) {
            BlockManagementTask task = it.next().getTask();
            if (task != null) {
                return task;
            }
        }
        return null;
    }

    private void runManagement() {
        while (!Thread.interrupted()) {
            try {
                if (BACKOFF_STRATEGY == BackoffStrategy.ANY && this.mLoadTracker.loadDetected(BlockStoreLocation.anyTier())) {
                    LOG.debug("Load detected. Sleeping {}ms.", Long.valueOf(LOAD_DETECTION_COOL_DOWN_TIME));
                    Thread.sleep(LOAD_DETECTION_COOL_DOWN_TIME);
                } else {
                    BlockManagementTask nextTask = getNextTask();
                    if (nextTask == null) {
                        LOG.debug("No management task pending. Sleeping {}ms.", Long.valueOf(LOAD_DETECTION_COOL_DOWN_TIME));
                        Thread.sleep(LOAD_DETECTION_COOL_DOWN_TIME);
                    } else {
                        LOG.debug("Running task of type:{}", nextTask.getClass().getSimpleName());
                        try {
                            BlockManagementTaskResult run = nextTask.run();
                            LOG.info("{} finished with result: {}", nextTask.getClass().getSimpleName(), run);
                            if (run.noProgress()) {
                                LOG.debug("Task made no progress due to failures/back-offs. Sleeping {}ms", Long.valueOf(LOAD_DETECTION_COOL_DOWN_TIME));
                                Thread.sleep(LOAD_DETECTION_COOL_DOWN_TIME);
                            }
                        } catch (Exception e) {
                            LOG.error("Management task failed: {}. Error: ", nextTask.getClass().getSimpleName(), e);
                        }
                    }
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            } catch (Throwable th) {
                LOG.error("Unexpected error during block management: ", th);
            }
        }
        LOG.debug("Coordinator interrupted.");
        LOG.debug("Block management coordinator exited.");
    }
}
