package alluxio.worker.block;

import alluxio.ClientContext;
import alluxio.ProcessUtils;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.heartbeat.HeartbeatThread;
import alluxio.master.MasterClientContext;
import alluxio.security.user.ServerUserState;
import alluxio.util.CommonUtils;
import alluxio.util.ConfigurationUtils;
import alluxio.util.WaitForOptions;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/worker/block/BlockSyncMasterGroup.class */
public class BlockSyncMasterGroup implements Closeable {
    private volatile boolean mStarted = false;
    private final boolean mTestMode = Configuration.getBoolean(PropertyKey.TEST_MODE);
    private final Map<InetSocketAddress, SpecificMasterBlockSync> mMasterSyncOperators = new HashMap();
    private static final Logger LOG = LoggerFactory.getLogger(SpecificMasterBlockSync.class);
    private static BlockMasterClientFactory sBlockMasterClientFactory = new BlockMasterClientFactory();
    private static final long WORKER_MASTER_CONNECT_RETRY_TIMEOUT = Configuration.getMs(PropertyKey.WORKER_MASTER_CONNECT_RETRY_TIMEOUT);

    @VisibleForTesting
    /* loaded from: input_file:alluxio/worker/block/BlockSyncMasterGroup$BlockMasterClientFactory.class */
    static class BlockMasterClientFactory {
        BlockMasterClientFactory() {
        }

        BlockMasterClient create(InetSocketAddress inetSocketAddress) {
            return new BlockMasterClient(MasterClientContext.newBuilder(ClientContext.create(Configuration.global())).build(), inetSocketAddress);
        }
    }

    /* loaded from: input_file:alluxio/worker/block/BlockSyncMasterGroup$Factory.class */
    public static class Factory {
        public static BlockSyncMasterGroup createAllMasterSync(BlockWorker blockWorker) {
            try {
                return new BlockSyncMasterGroup(ConfigurationUtils.getMasterRpcAddresses(Configuration.global()), blockWorker);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public BlockSyncMasterGroup(List<InetSocketAddress> list, BlockWorker blockWorker) throws IOException {
        for (InetSocketAddress inetSocketAddress : list) {
            BlockMasterClient create = sBlockMasterClientFactory.create(inetSocketAddress);
            BlockHeartbeatReporter blockHeartbeatReporter = new BlockHeartbeatReporter();
            blockWorker.getBlockStore().registerBlockStoreEventListener(blockHeartbeatReporter);
            this.mMasterSyncOperators.put(inetSocketAddress, this.mTestMode ? new TestSpecificMasterBlockSync(blockWorker, create, blockHeartbeatReporter) : new SpecificMasterBlockSync(blockWorker, create, blockHeartbeatReporter));
            LOG.info("Kick off BlockMasterSync with master {}", inetSocketAddress);
        }
    }

    public synchronized void start(ExecutorService executorService) {
        if (!this.mStarted) {
            this.mStarted = true;
        }
        this.mMasterSyncOperators.values().forEach(specificMasterBlockSync -> {
            executorService.submit((Runnable) new HeartbeatThread("Worker Block Sync", specificMasterBlockSync, () -> {
                return Long.valueOf(Configuration.getMs(PropertyKey.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS));
            }, Configuration.global(), ServerUserState.global()));
        });
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this.mMasterSyncOperators.values().forEach((v0) -> {
            v0.close();
        });
    }

    static void setBlockMasterClientFactory(BlockMasterClientFactory blockMasterClientFactory) {
        sBlockMasterClientFactory = blockMasterClientFactory;
    }

    public void waitForPrimaryMasterRegistrationComplete(InetSocketAddress inetSocketAddress) {
        SpecificMasterBlockSync specificMasterBlockSync = this.mMasterSyncOperators.get(inetSocketAddress);
        Preconditions.checkNotNull(specificMasterBlockSync, "Primary master block sync should not be null");
        try {
            specificMasterBlockSync.getClass();
            CommonUtils.waitFor(this + " to start", specificMasterBlockSync::isRegistered, WaitForOptions.defaults().setTimeoutMs(WORKER_MASTER_CONNECT_RETRY_TIMEOUT));
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            LOG.warn("Exit the worker on interruption", e);
            throw new RuntimeException(e);
        } catch (TimeoutException e2) {
            ProcessUtils.fatalError(LOG, e2, "Failed to register with primary master", new Object[0]);
        }
        LOG.info("The worker has registered with primary master, address {}", inetSocketAddress);
    }

    public boolean isRegisteredToAllMasters() {
        return this.mMasterSyncOperators.values().stream().allMatch((v0) -> {
            return v0.isRegistered();
        });
    }

    public Map<InetSocketAddress, SpecificMasterBlockSync> getMasterSyncOperators() {
        return this.mMasterSyncOperators;
    }
}
