/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.confignode.manager.node;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.GetDataNodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.write.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeRegisterResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ConsensusManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.node.BaseNodeCache;
import org.apache.iotdb.confignode.manager.node.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.node.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.persistence.metric.NodeInfoMetrics;
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
import org.apache.iotdb.db.service.metrics.MetricService;
import org.apache.iotdb.metrics.metricsets.IMetricSet;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NodeManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
    private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
    public static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatInterval();
    public static final TEndPoint CURRENT_NODE = new TEndPoint(CONF.getInternalAddress(), CONF.getInternalPort());
    private static final int ERROR_STATUS_NODE_ID = -1;
    private final IManager configManager;
    private final NodeInfo nodeInfo;
    private final ReentrantLock removeConfigNodeLock;
    private final Object scheduleMonitor = new Object();
    private final Map<Integer, BaseNodeCache> nodeCacheMap;
    private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
    private Future<?> currentHeartbeatFuture;
    private final ScheduledExecutorService heartBeatExecutor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor((String)LoadManager.class.getSimpleName());

    public NodeManager(IManager configManager, NodeInfo nodeInfo) {
        this.configManager = configManager;
        this.nodeInfo = nodeInfo;
        this.removeConfigNodeLock = new ReentrantLock();
        this.nodeCacheMap = new ConcurrentHashMap<Integer, BaseNodeCache>();
    }

    private void setGlobalConfig(DataNodeRegisterResp dataSet) {
        ConfigNodeConfig configNodeConfig = ConfigNodeDescriptor.getInstance().getConf();
        CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
        TGlobalConfig globalConfig = new TGlobalConfig();
        globalConfig.setDataRegionConsensusProtocolClass(configNodeConfig.getDataRegionConsensusProtocolClass());
        globalConfig.setSchemaRegionConsensusProtocolClass(configNodeConfig.getSchemaRegionConsensusProtocolClass());
        globalConfig.setSeriesPartitionSlotNum(configNodeConfig.getSeriesPartitionSlotNum());
        globalConfig.setSeriesPartitionExecutorClass(configNodeConfig.getSeriesPartitionExecutorClass());
        globalConfig.setTimePartitionInterval(configNodeConfig.getTimePartitionInterval());
        globalConfig.setReadConsistencyLevel(configNodeConfig.getReadConsistencyLevel());
        globalConfig.setDiskSpaceWarningThreshold(commonConfig.getDiskSpaceWarningThreshold());
        dataSet.setGlobalConfig(globalConfig);
    }

    private void setRatisConfig(DataNodeRegisterResp dataSet) {
        ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
        TRatisConfig ratisConfig = new TRatisConfig();
        ratisConfig.setDataAppenderBufferSize(conf.getDataRegionRatisConsensusLogAppenderBufferSize());
        ratisConfig.setSchemaAppenderBufferSize(conf.getSchemaRegionRatisConsensusLogAppenderBufferSize());
        ratisConfig.setDataSnapshotTriggerThreshold(conf.getDataRegionRatisSnapshotTriggerThreshold());
        ratisConfig.setSchemaSnapshotTriggerThreshold(conf.getSchemaRegionRatisSnapshotTriggerThreshold());
        ratisConfig.setDataLogUnsafeFlushEnable(conf.isDataRegionRatisLogUnsafeFlushEnable());
        ratisConfig.setSchemaLogUnsafeFlushEnable(conf.isSchemaRegionRatisLogUnsafeFlushEnable());
        ratisConfig.setDataLogSegmentSizeMax(conf.getDataRegionRatisLogSegmentSizeMax());
        ratisConfig.setSchemaLogSegmentSizeMax(conf.getSchemaRegionRatisLogSegmentSizeMax());
        ratisConfig.setDataGrpcFlowControlWindow(conf.getDataRegionRatisGrpcFlowControlWindow());
        ratisConfig.setSchemaGrpcFlowControlWindow(conf.getSchemaRegionRatisGrpcFlowControlWindow());
        ratisConfig.setDataLeaderElectionTimeoutMin(conf.getDataRegionRatisRpcLeaderElectionTimeoutMinMs());
        ratisConfig.setSchemaLeaderElectionTimeoutMin(conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMinMs());
        ratisConfig.setDataLeaderElectionTimeoutMax(conf.getDataRegionRatisRpcLeaderElectionTimeoutMaxMs());
        ratisConfig.setSchemaLeaderElectionTimeoutMax(conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMaxMs());
        dataSet.setRatisConfig(ratisConfig);
    }

    public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
        DataNodeRegisterResp dataSet = new DataNodeRegisterResp();
        TSStatus status = new TSStatus();
        if (this.nodeInfo.isRegisteredDataNode(registerDataNodePlan.getInfo().getLocation())) {
            status.setCode(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
            status.setMessage("DataNode already registered.");
        } else if (registerDataNodePlan.getInfo().getLocation().getDataNodeId() < 0) {
            registerDataNodePlan.getInfo().getLocation().setDataNodeId(this.nodeInfo.generateNextNodeId());
            this.getConsensusManager().write(registerDataNodePlan);
            this.getClusterSchemaManager().adjustMaxRegionGroupCount();
            status.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            status.setMessage("registerDataNode success.");
        }
        dataSet.setStatus(status);
        dataSet.setDataNodeId(registerDataNodePlan.getInfo().getLocation().getDataNodeId());
        dataSet.setConfigNodeList(this.getRegisteredConfigNodes());
        this.setGlobalConfig(dataSet);
        this.setRatisConfig(dataSet);
        return dataSet;
    }

    public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
        TSStatus status;
        LOGGER.info("NodeManager start to remove DataNode {}", (Object)removeDataNodePlan);
        DataNodeRemoveHandler dataNodeRemoveHandler = new DataNodeRemoveHandler((ConfigManager)this.configManager);
        DataNodeToStatusResp preCheckStatus = dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
        if (preCheckStatus.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            LOGGER.error("the remove Data Node request check failed.  req: {}, check result: {}", (Object)removeDataNodePlan, (Object)preCheckStatus.getStatus());
            return preCheckStatus;
        }
        DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
        boolean registerSucceed = this.configManager.getProcedureManager().removeDataNode(removeDataNodePlan);
        if (registerSucceed) {
            status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            status.setMessage("Server accept the request");
        } else {
            status = new TSStatus(TSStatusCode.NODE_DELETE_FAILED_ERROR.getStatusCode());
            status.setMessage("Server reject the request, maybe request is too much");
        }
        dataSet.setStatus(status);
        LOGGER.info("NodeManager finished to remove DataNode {}", (Object)removeDataNodePlan);
        return dataSet;
    }

    public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq req) {
        TSStatus status = this.configManager.getConsensusManager().confirmLeader();
        if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            TSStatus errorStatus = this.configManager.checkConfigNodeGlobalConfig(req);
            if (errorStatus != null) {
                return new TConfigNodeRegisterResp().setStatus(errorStatus).setConfigNodeId(-1);
            }
            int nodeId = this.generateNodeId();
            req.getConfigNodeLocation().setConfigNodeId(nodeId);
            this.configManager.getProcedureManager().addConfigNode(req);
            return new TConfigNodeRegisterResp().setStatus(StatusUtils.OK).setConfigNodeId(nodeId);
        }
        return new TConfigNodeRegisterResp().setStatus(status).setConfigNodeId(-1);
    }

    public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan req) {
        return (DataNodeConfigurationResp)this.getConsensusManager().read(req).getDataset();
    }

    public int getRegisteredDataNodeCount() {
        return this.nodeInfo.getRegisteredDataNodeCount();
    }

    public int getTotalCpuCoreCount() {
        return this.nodeInfo.getTotalCpuCoreCount();
    }

    public List<TDataNodeConfiguration> getRegisteredDataNodes() {
        return this.nodeInfo.getRegisteredDataNodes();
    }

    public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
        ConcurrentHashMap<Integer, TDataNodeLocation> dataNodeLocations = new ConcurrentHashMap<Integer, TDataNodeLocation>();
        this.nodeInfo.getRegisteredDataNodes().forEach(dataNodeConfiguration -> dataNodeLocations.put(dataNodeConfiguration.getLocation().getDataNodeId(), dataNodeConfiguration.getLocation()));
        return dataNodeLocations;
    }

    public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
        ArrayList<TDataNodeInfo> dataNodeInfoList = new ArrayList<TDataNodeInfo>();
        List<TDataNodeConfiguration> registeredDataNodes = this.getRegisteredDataNodes();
        if (registeredDataNodes != null) {
            registeredDataNodes.forEach(dataNodeInfo -> {
                TDataNodeInfo info = new TDataNodeInfo();
                int dataNodeId = dataNodeInfo.getLocation().getDataNodeId();
                info.setDataNodeId(dataNodeId);
                info.setStatus(this.getNodeStatusWithReason(dataNodeId));
                info.setRpcAddresss(dataNodeInfo.getLocation().getClientRpcEndPoint().getIp());
                info.setRpcPort(dataNodeInfo.getLocation().getClientRpcEndPoint().getPort());
                info.setDataRegionNum(0);
                info.setSchemaRegionNum(0);
                dataNodeInfoList.add(info);
            });
        }
        HashMap dataRegionNumMap = new HashMap();
        HashMap schemaRegionNumMap = new HashMap();
        List<TRegionReplicaSet> regionReplicaSets = this.getPartitionManager().getAllReplicaSets();
        regionReplicaSets.forEach(regionReplicaSet -> regionReplicaSet.getDataNodeLocations().forEach(dataNodeLocation -> {
            switch (regionReplicaSet.getRegionId().getType()) {
                case SchemaRegion: {
                    schemaRegionNumMap.computeIfAbsent(dataNodeLocation.getDataNodeId(), key -> new AtomicInteger()).getAndIncrement();
                    break;
                }
                default: {
                    dataRegionNumMap.computeIfAbsent(dataNodeLocation.getDataNodeId(), key -> new AtomicInteger()).getAndIncrement();
                }
            }
        }));
        AtomicInteger zero = new AtomicInteger(0);
        dataNodeInfoList.forEach(dataNodesInfo -> {
            dataNodesInfo.setSchemaRegionNum(schemaRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
            dataNodesInfo.setDataRegionNum(dataRegionNumMap.getOrDefault(dataNodesInfo.getDataNodeId(), zero).get());
        });
        dataNodeInfoList.sort(Comparator.comparingInt(TDataNodeInfo::getDataNodeId));
        return dataNodeInfoList;
    }

    public List<TConfigNodeInfo> getRegisteredConfigNodeInfoList() {
        ArrayList<TConfigNodeInfo> configNodeInfoList = new ArrayList<TConfigNodeInfo>();
        List<TConfigNodeLocation> registeredConfigNodes = this.getRegisteredConfigNodes();
        if (registeredConfigNodes != null) {
            registeredConfigNodes.forEach(configNodeLocation -> {
                TConfigNodeInfo info = new TConfigNodeInfo();
                int configNodeId = configNodeLocation.getConfigNodeId();
                info.setConfigNodeId(configNodeId);
                info.setStatus(this.getNodeStatusWithReason(configNodeId));
                info.setInternalAddress(configNodeLocation.getInternalEndPoint().getIp());
                info.setInternalPort(configNodeLocation.getInternalEndPoint().getPort());
                info.setRoleType(configNodeLocation.getInternalEndPoint().equals(CURRENT_NODE) ? RegionRoleType.Leader.name() : RegionRoleType.Follower.name());
                configNodeInfoList.add(info);
            });
        }
        configNodeInfoList.sort(Comparator.comparingInt(TConfigNodeInfo::getConfigNodeId));
        return configNodeInfoList;
    }

    public void applyConfigNode(TConfigNodeLocation configNodeLocation) {
        ApplyConfigNodePlan applyConfigNodePlan = new ApplyConfigNodePlan(configNodeLocation);
        this.getConsensusManager().write(applyConfigNodePlan);
    }

    public void addMetrics() {
        MetricService.getInstance().addMetricSet((IMetricSet)new NodeInfoMetrics(this.nodeInfo));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public TSStatus checkConfigNodeBeforeRemove(RemoveConfigNodePlan removeConfigNodePlan) {
        this.removeConfigNodeLock.tryLock();
        try {
            if (this.filterConfigNodeThroughStatus(NodeStatus.Running).size() <= 1) {
                TSStatus tSStatus = new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode()).setMessage("Remove ConfigNode failed because there is only one ConfigNode in current Cluster.");
                return tSStatus;
            }
            if (!this.getRegisteredConfigNodes().contains(removeConfigNodePlan.getConfigNodeLocation())) {
                TSStatus tSStatus = new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode()).setMessage("Remove ConfigNode failed because the ConfigNode not in current Cluster.");
                return tSStatus;
            }
            TConfigNodeLocation leader = this.getConsensusManager().getLeader();
            if (leader == null) {
                TSStatus tSStatus = new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode()).setMessage("Remove ConfigNode failed because the ConfigNodeGroup is on leader election, please retry.");
                return tSStatus;
            }
            if (leader.getInternalEndPoint().equals(removeConfigNodePlan.getConfigNodeLocation().getInternalEndPoint())) {
                TSStatus tSStatus = this.transferLeader(removeConfigNodePlan, this.getConsensusManager().getConsensusGroupId());
                return tSStatus;
            }
        }
        finally {
            this.removeConfigNodeLock.unlock();
        }
        return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()).setMessage("Success remove confignode.");
    }

    private TSStatus transferLeader(RemoveConfigNodePlan removeConfigNodePlan, ConsensusGroupId groupId) {
        TConfigNodeLocation newLeader = this.filterConfigNodeThroughStatus(NodeStatus.Running).stream().filter(e -> !e.equals(removeConfigNodePlan.getConfigNodeLocation())).findAny().get();
        ConsensusGenericResponse resp = this.getConsensusManager().getConsensusImpl().transferLeader(groupId, new Peer(groupId, newLeader.getConfigNodeId(), newLeader.getConsensusEndPoint()));
        if (!resp.isSuccess()) {
            return new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_FAILED.getStatusCode()).setMessage("Remove ConfigNode failed because transfer ConfigNode leader failed.");
        }
        return new TSStatus(TSStatusCode.NEED_REDIRECTION.getStatusCode()).setRedirectNode(newLeader.getInternalEndPoint()).setMessage("The ConfigNode to be removed is leader, already transfer Leader to " + newLeader + ".");
    }

    public List<TSStatus> merge() {
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        AsyncClientHandler clientHandler = new AsyncClientHandler(DataNodeRequestType.MERGE, dataNodeLocationMap);
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
        return clientHandler.getResponseList();
    }

    public List<TSStatus> flush(TFlushReq req) {
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        AsyncClientHandler clientHandler = new AsyncClientHandler(DataNodeRequestType.FLUSH, req, dataNodeLocationMap);
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
        return clientHandler.getResponseList();
    }

    public List<TSStatus> clearCache() {
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        AsyncClientHandler clientHandler = new AsyncClientHandler(DataNodeRequestType.CLEAR_CACHE, dataNodeLocationMap);
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
        return clientHandler.getResponseList();
    }

    public List<TSStatus> loadConfiguration() {
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        AsyncClientHandler clientHandler = new AsyncClientHandler(DataNodeRequestType.LOAD_CONFIGURATION, dataNodeLocationMap);
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
        return clientHandler.getResponseList();
    }

    public List<TSStatus> setSystemStatus(String status) {
        Map<Integer, TDataNodeLocation> dataNodeLocationMap = this.configManager.getNodeManager().getRegisteredDataNodeLocations();
        AsyncClientHandler clientHandler = new AsyncClientHandler(DataNodeRequestType.SET_SYSTEM_STATUS, status, dataNodeLocationMap);
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(clientHandler);
        return clientHandler.getResponseList();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void startHeartbeatService() {
        Object object = this.scheduleMonitor;
        synchronized (object) {
            if (this.currentHeartbeatFuture == null) {
                this.currentHeartbeatFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay((ScheduledExecutorService)this.heartBeatExecutor, this::heartbeatLoopBody, (long)0L, (long)HEARTBEAT_INTERVAL, (TimeUnit)TimeUnit.MILLISECONDS);
                LOGGER.info("Heartbeat service is started successfully.");
            }
        }
    }

    private void heartbeatLoopBody() {
        Optional.ofNullable(this.getConsensusManager()).ifPresent(consensusManager -> {
            if (this.getConsensusManager().isLeader()) {
                THeartbeatReq heartbeatReq = this.genHeartbeatReq();
                this.pingRegisteredDataNodes(heartbeatReq, this.getRegisteredDataNodes());
                this.pingRegisteredConfigNodes(heartbeatReq, this.getRegisteredConfigNodes());
            }
        });
    }

    private THeartbeatReq genHeartbeatReq() {
        THeartbeatReq heartbeatReq = new THeartbeatReq();
        heartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
        heartbeatReq.setNeedJudgeLeader(this.heartbeatCounter.get() % 5 == 0);
        heartbeatReq.setNeedSamplingLoad(this.heartbeatCounter.get() % 10 == 0);
        this.heartbeatCounter.getAndUpdate(x -> (x + 1) % 10);
        return heartbeatReq;
    }

    private void pingRegisteredDataNodes(THeartbeatReq heartbeatReq, List<TDataNodeConfiguration> registeredDataNodes) {
        for (TDataNodeConfiguration dataNodeInfo : registeredDataNodes) {
            DataNodeHeartbeatHandler handler = new DataNodeHeartbeatHandler(dataNodeInfo.getLocation(), (DataNodeHeartbeatCache)this.nodeCacheMap.computeIfAbsent(dataNodeInfo.getLocation().getDataNodeId(), empty -> new DataNodeHeartbeatCache()), this.getPartitionManager().getRegionGroupCacheMap());
            AsyncDataNodeHeartbeatClientPool.getInstance().getDataNodeHeartBeat(dataNodeInfo.getLocation().getInternalEndPoint(), heartbeatReq, handler);
        }
    }

    private void pingRegisteredConfigNodes(THeartbeatReq heartbeatReq, List<TConfigNodeLocation> registeredConfigNodes) {
        for (TConfigNodeLocation configNodeLocation : registeredConfigNodes) {
            if (configNodeLocation.getInternalEndPoint().equals(CURRENT_NODE)) {
                this.nodeCacheMap.putIfAbsent(configNodeLocation.getConfigNodeId(), new ConfigNodeHeartbeatCache(configNodeLocation));
                continue;
            }
            ConfigNodeHeartbeatHandler handler = new ConfigNodeHeartbeatHandler((ConfigNodeHeartbeatCache)this.nodeCacheMap.computeIfAbsent(configNodeLocation.getConfigNodeId(), empty -> new ConfigNodeHeartbeatCache(configNodeLocation)));
            AsyncConfigNodeHeartbeatClientPool.getInstance().getConfigNodeHeartBeat(configNodeLocation.getInternalEndPoint(), heartbeatReq.getHeartbeatTimestamp(), handler);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stopHeartbeatService() {
        Object object = this.scheduleMonitor;
        synchronized (object) {
            if (this.currentHeartbeatFuture != null) {
                this.currentHeartbeatFuture.cancel(false);
                this.currentHeartbeatFuture = null;
                this.nodeCacheMap.clear();
                LOGGER.info("Heartbeat service is stopped successfully.");
            }
        }
    }

    public Map<Integer, BaseNodeCache> getNodeCacheMap() {
        return this.nodeCacheMap;
    }

    public void removeNodeCache(int nodeId) {
        this.nodeCacheMap.remove(nodeId);
    }

    private String getNodeStatusWithReason(int nodeId) {
        BaseNodeCache nodeCache = this.nodeCacheMap.get(nodeId);
        return nodeCache == null ? NodeStatus.Unknown.getStatus() + "(NoHeartbeat)" : nodeCache.getNodeStatusWithReason();
    }

    public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus ... status) {
        return this.getRegisteredConfigNodes().stream().filter(registeredConfigNode -> {
            int configNodeId = registeredConfigNode.getConfigNodeId();
            return this.nodeCacheMap.containsKey(configNodeId) && Arrays.stream(status).anyMatch(s -> s.equals((Object)this.nodeCacheMap.get(configNodeId).getNodeStatus()));
        }).collect(Collectors.toList());
    }

    public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus ... status) {
        return this.getRegisteredDataNodes().stream().filter(registeredDataNode -> {
            int dataNodeId = registeredDataNode.getLocation().getDataNodeId();
            return this.nodeCacheMap.containsKey(dataNodeId) && Arrays.stream(status).anyMatch(s -> s.equals((Object)this.nodeCacheMap.get(dataNodeId).getNodeStatus()));
        }).collect(Collectors.toList());
    }

    public Map<Integer, Long> getAllLoadScores() {
        ConcurrentHashMap<Integer, Long> result = new ConcurrentHashMap<Integer, Long>();
        this.nodeCacheMap.forEach((dataNodeId, heartbeatCache) -> result.put((Integer)dataNodeId, heartbeatCache.getLoadScore()));
        return result;
    }

    public TDataNodeLocation getLowestLoadDataNode() {
        AtomicInteger result = new AtomicInteger();
        AtomicLong lowestLoadScore = new AtomicLong(Long.MAX_VALUE);
        this.nodeCacheMap.forEach((dataNodeId, heartbeatCache) -> {
            long score = heartbeatCache.getLoadScore();
            if (score < lowestLoadScore.get()) {
                result.set((int)dataNodeId);
                lowestLoadScore.set(score);
            }
        });
        LOGGER.info("get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", (Object)result, (Object)lowestLoadScore);
        return this.configManager.getNodeManager().getRegisteredDataNodeLocations().get(result.get());
    }

    public boolean isNodeRemoving(int dataNodeId) {
        DataNodeHeartbeatCache cache = (DataNodeHeartbeatCache)this.configManager.getNodeManager().getNodeCacheMap().get(dataNodeId);
        if (cache != null) {
            return NodeStatus.Removing.equals((Object)cache.getNodeStatus());
        }
        return false;
    }

    public void setNodeRemovingStatus(TDataNodeLocation dataNodeLocation) {
        DataNodeHeartbeatCache cache = (DataNodeHeartbeatCache)this.configManager.getNodeManager().getNodeCacheMap().get(dataNodeLocation.getDataNodeId());
        if (cache != null) {
            cache.setRemoving();
        }
        SyncDataNodeClientPool.getInstance().sendSyncRequestToDataNodeWithRetry(dataNodeLocation.getInternalEndPoint(), NodeStatus.Removing.getStatus(), DataNodeRequestType.SET_SYSTEM_STATUS);
    }

    public List<TConfigNodeLocation> getRegisteredConfigNodes() {
        return this.nodeInfo.getRegisteredConfigNodes();
    }

    private ConsensusManager getConsensusManager() {
        return this.configManager.getConsensusManager();
    }

    private ClusterSchemaManager getClusterSchemaManager() {
        return this.configManager.getClusterSchemaManager();
    }

    private PartitionManager getPartitionManager() {
        return this.configManager.getPartitionManager();
    }

    public int generateNodeId() {
        return this.nodeInfo.generateNextNodeId();
    }
}

