/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.container.replication;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.DeleteContainerCommandWatcher;
import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationCommandWatcher;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationQueue;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicationManager
implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationManager.class);
    private ReplicationQueue replicationQueue;
    private ContainerPlacementPolicy containerPlacement;
    private EventPublisher eventPublisher;
    private ReplicationCommandWatcher replicationCommandWatcher;
    private DeleteContainerCommandWatcher deleteContainerCommandWatcher;
    private boolean running = true;
    private ContainerManager containerManager;

    public ReplicationManager(ContainerPlacementPolicy containerPlacement, ContainerManager containerManager, EventQueue eventQueue, LeaseManager<Long> commandWatcherLeaseManager) {
        this.containerPlacement = containerPlacement;
        this.containerManager = containerManager;
        this.eventPublisher = eventQueue;
        this.replicationCommandWatcher = new ReplicationCommandWatcher((Event<ReplicationRequestToRepeat>)SCMEvents.TRACK_REPLICATE_COMMAND, (Event<ReplicationCompleted>)SCMEvents.REPLICATION_COMPLETE, commandWatcherLeaseManager);
        this.deleteContainerCommandWatcher = new DeleteContainerCommandWatcher((Event<DeletionRequestToRepeat>)SCMEvents.TRACK_DELETE_CONTAINER_COMMAND, (Event<DeleteContainerCommandCompleted>)SCMEvents.DELETE_CONTAINER_COMMAND_COMPLETE, commandWatcherLeaseManager);
        this.replicationQueue = new ReplicationQueue();
        eventQueue.addHandler(SCMEvents.REPLICATE_CONTAINER, (replicationRequest, publisher) -> this.replicationQueue.add((ReplicationRequest)replicationRequest));
        this.replicationCommandWatcher.start(eventQueue);
    }

    public void start() {
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Replication Manager").build();
        threadFactory.newThread(this).start();
    }

    @Override
    public void run() {
        while (this.running) {
            ReplicationRequest request = null;
            try {
                request = this.replicationQueue.take();
                ContainerID containerID = new ContainerID(request.getContainerId());
                ContainerInfo container = this.containerManager.getContainer(containerID);
                HddsProtos.LifeCycleState state = container.getState();
                if (state != HddsProtos.LifeCycleState.CLOSED && state != HddsProtos.LifeCycleState.QUASI_CLOSED) {
                    LOG.warn("Cannot replicate the container {} when in {} state.", (Object)containerID, (Object)state);
                    continue;
                }
                ArrayList<ContainerReplica> containerReplicas = new ArrayList<ContainerReplica>(this.getCurrentReplicas(request));
                if (containerReplicas.size() == 0) {
                    LOG.warn("Container {} should be replicated but can't find any existing replicas", (Object)containerID);
                    return;
                }
                ReplicationRequest finalRequest = request;
                int inFlightReplications = this.replicationCommandWatcher.getTimeoutEvents(e -> e.getRequest().getContainerId() == finalRequest.getContainerId()).size();
                int inFlightDelete = this.deleteContainerCommandWatcher.getTimeoutEvents(e -> e.getRequest().getContainerId() == finalRequest.getContainerId()).size();
                int deficit = request.getExpecReplicationCount() - containerReplicas.size() - (inFlightReplications - inFlightDelete);
                if (deficit > 0) {
                    List<DatanodeDetails> datanodes = containerReplicas.stream().sorted((r1, r2) -> r2.getSequenceId().compareTo(r1.getSequenceId())).map(ContainerReplica::getDatanodeDetails).collect(Collectors.toList());
                    List<DatanodeDetails> selectedDatanodes = this.containerPlacement.chooseDatanodes(datanodes, deficit, container.getUsedBytes());
                    for (DatanodeDetails datanode : selectedDatanodes) {
                        LOG.info("Container {} is under replicated. Expected replica count is {}, but found {}. Re-replicating it on {}.", new Object[]{container.containerID(), request.getExpecReplicationCount(), containerReplicas.size(), datanode});
                        ReplicateContainerCommand replicateCommand = new ReplicateContainerCommand(containerID.getId(), datanodes);
                        this.eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, (Object)new CommandForDatanode(datanode.getUuid(), (SCMCommand)replicateCommand));
                        ReplicationRequestToRepeat timeoutEvent = new ReplicationRequestToRepeat(replicateCommand.getId(), request);
                        this.eventPublisher.fireEvent(SCMEvents.TRACK_REPLICATE_COMMAND, (Object)timeoutEvent);
                    }
                    continue;
                }
                if (deficit >= 0) continue;
                int numberOfReplicasToDelete = Math.abs(deficit);
                LinkedHashMap originIdToDnMap = new LinkedHashMap();
                containerReplicas.stream().sorted(Comparator.comparing(ContainerReplica::getSequenceId)).forEach(replica -> {
                    originIdToDnMap.computeIfAbsent(replica.getOriginDatanodeId(), key -> new ArrayList());
                    ((List)originIdToDnMap.get(replica.getOriginDatanodeId())).add(replica.getDatanodeDetails());
                });
                for (List listOfReplica : originIdToDnMap.values()) {
                    if (listOfReplica.size() > 1) {
                        int toDelete = Math.min(listOfReplica.size() - 1, numberOfReplicasToDelete);
                        DeleteContainerCommand deleteContainer = new DeleteContainerCommand(containerID.getId(), true);
                        for (int i = 0; i < toDelete; ++i) {
                            LOG.info("Container {} is over replicated. Expected replica count is {}, but found {}. Deleting the replica on {}.", new Object[]{container.containerID(), request.getExpecReplicationCount(), containerReplicas.size(), listOfReplica.get(i)});
                            this.eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, (Object)new CommandForDatanode(((DatanodeDetails)listOfReplica.get(i)).getUuid(), (SCMCommand)deleteContainer));
                            DeletionRequestToRepeat timeoutEvent = new DeletionRequestToRepeat(deleteContainer.getId(), request);
                            this.eventPublisher.fireEvent(SCMEvents.TRACK_DELETE_CONTAINER_COMMAND, (Object)timeoutEvent);
                        }
                        numberOfReplicasToDelete -= toDelete;
                    }
                    if (numberOfReplicasToDelete != 0) continue;
                    break;
                }
                if (numberOfReplicasToDelete == 0) continue;
                int expectedReplicaCount = container.getReplicationFactor().getNumber();
                LOG.warn("Not able to delete the container replica of Container {} even though it is over replicated. Expected replica count is {}, current replica count is {}.", new Object[]{containerID, expectedReplicaCount, expectedReplicaCount + numberOfReplicasToDelete});
            }
            catch (Exception e2) {
                LOG.error("Can't replicate container {}", (Object)request, (Object)e2);
            }
        }
    }

    @VisibleForTesting
    protected Set<ContainerReplica> getCurrentReplicas(ReplicationRequest request) throws IOException {
        return this.containerManager.getContainerReplicas(new ContainerID(request.getContainerId()));
    }

    @VisibleForTesting
    public ReplicationQueue getReplicationQueue() {
        return this.replicationQueue;
    }

    public void stop() {
        this.running = false;
    }

    public static class DeleteContainerCommandCompleted
    implements IdentifiableEventPayload {
        private final long uuid;

        public DeleteContainerCommandCompleted(long uuid) {
            this.uuid = uuid;
        }

        public long getId() {
            return this.uuid;
        }
    }

    public static class ReplicationCompleted
    implements IdentifiableEventPayload {
        private final long uuid;

        public ReplicationCompleted(long uuid) {
            this.uuid = uuid;
        }

        public long getId() {
            return this.uuid;
        }
    }

    public static class ContainerRequestToRepeat
    implements IdentifiableEventPayload {
        private final long commandId;
        private final ReplicationRequest request;

        ContainerRequestToRepeat(long commandId, ReplicationRequest request) {
            this.commandId = commandId;
            this.request = request;
        }

        public ReplicationRequest getRequest() {
            return this.request;
        }

        public long getId() {
            return this.commandId;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            ContainerRequestToRepeat that = (ContainerRequestToRepeat)o;
            return Objects.equals(this.request, that.request);
        }

        public int hashCode() {
            return Objects.hash(this.request);
        }
    }

    public static class DeletionRequestToRepeat
    extends ContainerRequestToRepeat {
        public DeletionRequestToRepeat(long commandId, ReplicationRequest request) {
            super(commandId, request);
        }
    }

    public static class ReplicationRequestToRepeat
    extends ContainerRequestToRepeat {
        public ReplicationRequestToRepeat(long commandId, ReplicationRequest request) {
            super(commandId, request);
        }
    }
}

