/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.container;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.GeneratedMessage;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigTag;
import org.apache.hadoop.hdds.conf.ConfigType;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.ozone.lock.LockManager;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.Time;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicationManager {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationManager.class);
    private final ContainerManager containerManager;
    private final ContainerPlacementPolicy containerPlacement;
    private final EventPublisher eventPublisher;
    private final LockManager<ContainerID> lockManager;
    private final Map<ContainerID, List<InflightAction>> inflightReplication;
    private final Map<ContainerID, List<InflightAction>> inflightDeletion;
    private final ReplicationManagerConfiguration conf;
    private Thread replicationMonitor;
    private volatile boolean running;

    public ReplicationManager(ReplicationManagerConfiguration conf, ContainerManager containerManager, ContainerPlacementPolicy containerPlacement, EventPublisher eventPublisher, LockManager<ContainerID> lockManager) {
        this.containerManager = containerManager;
        this.containerPlacement = containerPlacement;
        this.eventPublisher = eventPublisher;
        this.lockManager = lockManager;
        this.conf = conf;
        this.running = false;
        this.inflightReplication = new HashMap<ContainerID, List<InflightAction>>();
        this.inflightDeletion = new HashMap<ContainerID, List<InflightAction>>();
    }

    public synchronized void start() {
        if (!this.isRunning()) {
            LOG.info("Starting Replication Monitor Thread.");
            this.running = true;
            this.replicationMonitor = new Thread(this::run);
            this.replicationMonitor.setName("ReplicationMonitor");
            this.replicationMonitor.setDaemon(true);
            this.replicationMonitor.start();
        } else {
            LOG.info("Replication Monitor Thread is already running.");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isRunning() {
        if (!this.running) {
            ReplicationManager replicationManager = this;
            synchronized (replicationManager) {
                return this.replicationMonitor != null && this.replicationMonitor.isAlive();
            }
        }
        return true;
    }

    @VisibleForTesting
    @SuppressFBWarnings(value={"NN_NAKED_NOTIFY"}, justification="Used only for testing")
    synchronized void processContainersNow() {
        this.notify();
    }

    public synchronized void stop() {
        if (this.running) {
            LOG.info("Stopping Replication Monitor Thread.");
            this.inflightReplication.clear();
            this.inflightDeletion.clear();
            this.running = false;
            this.notify();
        } else {
            LOG.info("Replication Monitor Thread is not running.");
        }
    }

    private synchronized void run() {
        try {
            while (this.running) {
                long start = Time.monotonicNow();
                Set<ContainerID> containerIds = this.containerManager.getContainerIDs();
                containerIds.forEach(this::processContainer);
                LOG.info("Replication Monitor Thread took {} milliseconds for processing {} containers.", (Object)(Time.monotonicNow() - start), (Object)containerIds.size());
                this.wait(this.conf.getInterval());
            }
        }
        catch (Throwable t) {
            LOG.error("Exception in Replication Monitor Thread.", t);
            ExitUtil.terminate((int)1, (Throwable)t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processContainer(ContainerID id) {
        this.lockManager.lock((Object)id);
        try {
            ContainerInfo container = this.containerManager.getContainer(id);
            Set<ContainerReplica> replicas = this.containerManager.getContainerReplicas(container.containerID());
            HddsProtos.LifeCycleState state = container.getState();
            if (state == HddsProtos.LifeCycleState.OPEN) {
                return;
            }
            if (state == HddsProtos.LifeCycleState.CLOSING) {
                replicas.forEach(replica -> this.sendCloseCommand(container, replica.getDatanodeDetails(), false));
                return;
            }
            if (state == HddsProtos.LifeCycleState.QUASI_CLOSED && this.canForceCloseContainer(container, replicas)) {
                this.forceCloseContainer(container, replicas);
                return;
            }
            this.updateInflightAction(container, this.inflightReplication, action -> replicas.stream().anyMatch(r -> r.getDatanodeDetails().equals((Object)((InflightAction)action).datanode)));
            this.updateInflightAction(container, this.inflightDeletion, action -> replicas.stream().noneMatch(r -> r.getDatanodeDetails().equals((Object)((InflightAction)action).datanode)));
            if (this.isContainerHealthy(container, replicas)) {
                return;
            }
            if (this.isContainerUnderReplicated(container, replicas)) {
                this.handleUnderReplicatedContainer(container, replicas);
                return;
            }
            if (this.isContainerOverReplicated(container, replicas)) {
                this.handleOverReplicatedContainer(container, replicas);
                return;
            }
            this.handleUnstableContainer(container, replicas);
        }
        catch (ContainerNotFoundException ex) {
            LOG.warn("Missing container {}.", (Object)id);
        }
        finally {
            this.lockManager.unlock((Object)id);
        }
    }

    private void updateInflightAction(ContainerInfo container, Map<ContainerID, List<InflightAction>> inflightActions, Predicate<InflightAction> filter) {
        ContainerID id = container.containerID();
        long deadline = Time.monotonicNow() - this.conf.getEventTimeout();
        if (inflightActions.containsKey(id)) {
            List<InflightAction> actions = inflightActions.get(id);
            actions.removeIf(action -> ((InflightAction)action).time < deadline);
            actions.removeIf(filter);
            if (actions.isEmpty()) {
                inflightActions.remove(id);
            }
        }
    }

    private boolean isContainerHealthy(ContainerInfo container, Set<ContainerReplica> replicas) {
        return container.getReplicationFactor().getNumber() == replicas.size() && replicas.stream().allMatch(r -> ReplicationManager.compareState(container.getState(), r.getState()));
    }

    private boolean isContainerUnderReplicated(ContainerInfo container, Set<ContainerReplica> replicas) {
        return container.getReplicationFactor().getNumber() > this.getReplicaCount(container.containerID(), replicas);
    }

    private boolean isContainerOverReplicated(ContainerInfo container, Set<ContainerReplica> replicas) {
        return container.getReplicationFactor().getNumber() < this.getReplicaCount(container.containerID(), replicas);
    }

    private int getReplicaCount(ContainerID id, Set<ContainerReplica> replicas) {
        return replicas.size() + this.inflightReplication.getOrDefault(id, Collections.emptyList()).size() - this.inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
    }

    private boolean canForceCloseContainer(ContainerInfo container, Set<ContainerReplica> replicas) {
        Preconditions.assertTrue((container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED ? 1 : 0) != 0);
        int replicationFactor = container.getReplicationFactor().getNumber();
        long uniqueQuasiClosedReplicaCount = replicas.stream().filter(r -> r.getState() == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED).map(ContainerReplica::getOriginDatanodeId).distinct().count();
        return uniqueQuasiClosedReplicaCount > (long)(replicationFactor / 2);
    }

    private void forceCloseContainer(ContainerInfo container, Set<ContainerReplica> replicas) {
        Preconditions.assertTrue((container.getState() == HddsProtos.LifeCycleState.QUASI_CLOSED ? 1 : 0) != 0);
        List quasiClosedReplicas = replicas.stream().filter(r -> r.getState() == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED).collect(Collectors.toList());
        Long sequenceId = quasiClosedReplicas.stream().map(ContainerReplica::getSequenceId).max(Long::compare).orElse(-1L);
        LOG.info("Force closing container {} with BCSID {}, which is in QUASI_CLOSED state.", (Object)container.containerID(), (Object)sequenceId);
        quasiClosedReplicas.stream().filter(r -> sequenceId != -1L).filter(replica -> replica.getSequenceId().equals(sequenceId)).forEach(replica -> this.sendCloseCommand(container, replica.getDatanodeDetails(), true));
    }

    private void handleUnderReplicatedContainer(ContainerInfo container, Set<ContainerReplica> replicas) {
        try {
            ContainerID id = container.containerID();
            List deletionInFlight = this.inflightDeletion.getOrDefault(id, Collections.emptyList()).stream().map(action -> ((InflightAction)action).datanode).collect(Collectors.toList());
            List<DatanodeDetails> source = replicas.stream().filter(r -> r.getState() == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED || r.getState() == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED).filter(r -> !deletionInFlight.contains(r.getDatanodeDetails())).sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId())).map(ContainerReplica::getDatanodeDetails).collect(Collectors.toList());
            if (source.size() > 0) {
                int replicationFactor = container.getReplicationFactor().getNumber();
                int delta = replicationFactor - this.getReplicaCount(id, replicas);
                List excludeList = replicas.stream().map(ContainerReplica::getDatanodeDetails).collect(Collectors.toList());
                List<InflightAction> actionList = this.inflightReplication.get(id);
                if (actionList != null) {
                    actionList.stream().map(r -> ((InflightAction)r).datanode).forEach(excludeList::add);
                }
                List selectedDatanodes = this.containerPlacement.chooseDatanodes(excludeList, null, delta, container.getUsedBytes());
                LOG.info("Container {} is under replicated. Expected replica count is {}, but found {}.", new Object[]{id, replicationFactor, replicationFactor - delta});
                for (DatanodeDetails datanode : selectedDatanodes) {
                    this.sendReplicateCommand(container, datanode, source);
                }
            } else {
                LOG.warn("Cannot replicate container {}, no healthy replica found.", (Object)container.containerID());
            }
        }
        catch (IOException ex) {
            LOG.warn("Exception while replicating container {}.", (Object)container.getContainerID(), (Object)ex);
        }
    }

    private void handleOverReplicatedContainer(ContainerInfo container, Set<ContainerReplica> replicas) {
        ContainerID id = container.containerID();
        int replicationFactor = container.getReplicationFactor().getNumber();
        int excess = replicas.size() - replicationFactor - this.inflightDeletion.getOrDefault(id, Collections.emptyList()).size();
        if (excess > 0) {
            LOG.info("Container {} is over replicated. Expected replica count is {}, but found {}.", new Object[]{id, replicationFactor, replicationFactor + excess});
            LinkedHashMap uniqueReplicas = new LinkedHashMap();
            replicas.stream().filter(r -> ReplicationManager.compareState(container.getState(), r.getState())).forEach(r -> uniqueReplicas.putIfAbsent(r.getOriginDatanodeId(), r));
            ArrayList<ContainerReplica> eligibleReplicas = new ArrayList<ContainerReplica>(replicas);
            eligibleReplicas.removeAll(uniqueReplicas.values());
            List unhealthyReplicas = eligibleReplicas.stream().filter(r -> !ReplicationManager.compareState(container.getState(), r.getState())).collect(Collectors.toList());
            eligibleReplicas.removeAll(unhealthyReplicas);
            eligibleReplicas.addAll(0, unhealthyReplicas);
            for (int i = 0; i < excess; ++i) {
                this.sendDeleteCommand(container, ((ContainerReplica)eligibleReplicas.get(i)).getDatanodeDetails(), true);
            }
        }
    }

    private void handleUnstableContainer(ContainerInfo container, Set<ContainerReplica> replicas) {
        List unhealthyReplicas = replicas.stream().filter(r -> !ReplicationManager.compareState(container.getState(), r.getState())).collect(Collectors.toList());
        Iterator iterator = unhealthyReplicas.iterator();
        while (iterator.hasNext()) {
            ContainerReplica replica2 = (ContainerReplica)iterator.next();
            StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State state = replica2.getState();
            if (state == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN || state == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSING) {
                this.sendCloseCommand(container, replica2.getDatanodeDetails(), false);
                iterator.remove();
            }
            if (state != StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED || container.getSequenceId() != replica2.getSequenceId().longValue()) continue;
            this.sendCloseCommand(container, replica2.getDatanodeDetails(), true);
            iterator.remove();
        }
        unhealthyReplicas.stream().findFirst().ifPresent(replica -> this.sendDeleteCommand(container, replica.getDatanodeDetails(), false));
    }

    private void sendCloseCommand(ContainerInfo container, DatanodeDetails datanode, boolean force) {
        LOG.info("Sending close container command for container {} to datanode {}.", (Object)container.containerID(), (Object)datanode);
        CloseContainerCommand closeContainerCommand = new CloseContainerCommand(container.getContainerID(), container.getPipelineID(), force);
        this.eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, (Object)new CommandForDatanode(datanode.getUuid(), (SCMCommand)closeContainerCommand));
    }

    private void sendReplicateCommand(ContainerInfo container, DatanodeDetails datanode, List<DatanodeDetails> sources) {
        LOG.info("Sending replicate container command for container {} to datanode {}", (Object)container.containerID(), (Object)datanode);
        ContainerID id = container.containerID();
        ReplicateContainerCommand replicateCommand = new ReplicateContainerCommand(id.getId(), sources);
        this.inflightReplication.computeIfAbsent(id, k -> new ArrayList());
        this.sendAndTrackDatanodeCommand(datanode, (SCMCommand)replicateCommand, action -> this.inflightReplication.get(id).add((InflightAction)action));
    }

    private void sendDeleteCommand(ContainerInfo container, DatanodeDetails datanode, boolean force) {
        LOG.info("Sending delete container command for container {} to datanode {}", (Object)container.containerID(), (Object)datanode);
        ContainerID id = container.containerID();
        DeleteContainerCommand deleteCommand = new DeleteContainerCommand(id.getId(), force);
        this.inflightDeletion.computeIfAbsent(id, k -> new ArrayList());
        this.sendAndTrackDatanodeCommand(datanode, (SCMCommand)deleteCommand, action -> this.inflightDeletion.get(id).add((InflightAction)action));
    }

    private <T extends GeneratedMessage> void sendAndTrackDatanodeCommand(DatanodeDetails datanode, SCMCommand<T> command, Consumer<InflightAction> tracker) {
        CommandForDatanode datanodeCommand = new CommandForDatanode(datanode.getUuid(), command);
        this.eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, (Object)datanodeCommand);
        tracker.accept(new InflightAction(datanode, Time.monotonicNow()));
    }

    private static boolean compareState(HddsProtos.LifeCycleState containerState, StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State replicaState) {
        switch (containerState) {
            case OPEN: {
                return replicaState == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.OPEN;
            }
            case CLOSING: {
                return replicaState == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSING;
            }
            case QUASI_CLOSED: {
                return replicaState == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.QUASI_CLOSED;
            }
            case CLOSED: {
                return replicaState == StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED;
            }
            case DELETING: {
                return false;
            }
            case DELETED: {
                return false;
            }
        }
        return false;
    }

    @ConfigGroup(prefix="hdds.scm.replication")
    public static class ReplicationManagerConfiguration {
        private long interval = 300000L;
        private long eventTimeout = 600000L;

        @Config(key="thread.interval", type=ConfigType.TIME, defaultValue="300s", tags={ConfigTag.SCM, ConfigTag.OZONE}, description="When a heartbeat from the data node arrives on SCM, It is queued for processing with the time stamp of when the heartbeat arrived. There is a heartbeat processing thread inside SCM that runs at a specified interval. This value controls how frequently this thread is run.\n\nThere are some assumptions build into SCM such as this value should allow the heartbeat processing thread to run at least three times more frequently than heartbeats and at least five times more than stale node detection time. If you specify a wrong value, SCM will gracefully refuse to run. For more info look at the node manager tests in SCM.\n\nIn short, you don't need to change this.")
        public void setInterval(long interval) {
            this.interval = interval;
        }

        @Config(key="event.timeout", type=ConfigType.TIME, defaultValue="10m", tags={ConfigTag.SCM, ConfigTag.OZONE}, description="Timeout for the container replication/deletion commands sent  to datanodes. After this timeout the command will be retried.")
        public void setEventTimeout(long eventTimeout) {
            this.eventTimeout = eventTimeout;
        }

        public long getInterval() {
            return this.interval;
        }

        public long getEventTimeout() {
            return this.eventTimeout;
        }
    }

    private static final class InflightAction {
        private final DatanodeDetails datanode;
        private final long time;

        private InflightAction(DatanodeDetails datanode, long time) {
            this.datanode = datanode;
            this.time = time;
        }
    }
}

