package org.apache.hadoop.hbase.replication.regionserver;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationGroupOffset;
import org.apache.hadoop.hbase.replication.ReplicationOffsetUtil;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueData;
import org.apache.hadoop.hbase.replication.ReplicationQueueId;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.shaded.com.google.errorprone.annotations.RestrictedApi;
import org.apache.hadoop.hbase.shaded.org.apache.commons.crypto.jna.OpenSslNativeJna;
import org.apache.hadoop.hbase.shaded.org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.AbstractWALProvider;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.common.collect.UnmodifiableIterator;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.class */
public class ReplicationSourceManager {
    private static final Logger LOG;
    private final ReplicationQueueStorage queueStorage;
    private final ReplicationPeers replicationPeers;
    private final UUID clusterId;
    private final Server server;
    private final SyncReplicationPeerMappingManager syncReplicationPeerMappingManager;
    private final Configuration conf;
    private final FileSystem fs;
    private final Map<String, Path> latestPaths;
    private final Path logDir;
    private final Path oldLogDir;
    private final WALFactory walFactory;
    private final long sleepBeforeFailover;
    private final ThreadPoolExecutor executor;
    private final long sleepForRetries;
    private final int maxRetriesMultiplier;
    private final long totalBufferLimit;
    private final MetricsReplicationGlobalSourceSource globalMetrics;
    static final /* synthetic */ boolean $assertionsDisabled;
    private AtomicLong totalBufferUsed = new AtomicLong();
    private final ConcurrentMap<String, ReplicationSourceInterface> sources = new ConcurrentHashMap();
    private final ConcurrentMap<ReplicationQueueId, Map<String, NavigableSet<String>>> walsById = new ConcurrentHashMap();
    private final ConcurrentMap<ReplicationQueueId, Map<String, NavigableSet<Path>>> walsByIdRecoveredQueues = new ConcurrentHashMap();
    private final List<ReplicationSourceInterface> oldsources = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager$ReplicationQueueOperation.class */
    public interface ReplicationQueueOperation {
        void exec() throws ReplicationException;
    }

    public ReplicationSourceManager(ReplicationQueueStorage replicationQueueStorage, ReplicationPeers replicationPeers, Configuration configuration, Server server, FileSystem fileSystem, Path path, Path path2, UUID uuid, WALFactory wALFactory, SyncReplicationPeerMappingManager syncReplicationPeerMappingManager, MetricsReplicationGlobalSourceSource metricsReplicationGlobalSourceSource) throws IOException {
        this.queueStorage = replicationQueueStorage;
        this.replicationPeers = replicationPeers;
        this.server = server;
        this.conf = configuration;
        this.fs = fileSystem;
        this.logDir = path;
        this.oldLogDir = path2;
        this.sleepBeforeFailover = configuration.getLong("replication.sleep.before.failover", 30000L);
        this.clusterId = uuid;
        this.walFactory = wALFactory;
        this.syncReplicationPeerMappingManager = syncReplicationPeerMappingManager;
        int i = configuration.getInt("replication.executor.workers", 1);
        this.executor = new ThreadPoolExecutor(i, i, 100L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        ThreadFactoryBuilder threadFactoryBuilder = new ThreadFactoryBuilder();
        threadFactoryBuilder.setNameFormat("ReplicationExecutor-%d");
        threadFactoryBuilder.setDaemon(true);
        this.executor.setThreadFactory(threadFactoryBuilder.build());
        this.latestPaths = new HashMap();
        this.sleepForRetries = this.conf.getLong("replication.source.sync.sleepforretries", 1000L);
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.sync.maxretriesmultiplier", 60);
        this.totalBufferLimit = configuration.getLong(HConstants.REPLICATION_SOURCE_TOTAL_BUFFER_KEY, OpenSslNativeJna.VERSION_1_0_X);
        this.globalMetrics = metricsReplicationGlobalSourceSource;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void init() throws IOException {
        Iterator<String> it = this.replicationPeers.getAllPeerIds().iterator();
        while (it.hasNext()) {
            addSource(it.next(), true);
        }
    }

    public void addPeer(String str) throws IOException {
        try {
            if (this.replicationPeers.addPeer(str)) {
                addSource(str, false);
            }
        } catch (ReplicationException e) {
            throw new IOException(e);
        }
    }

    public void removePeer(String str) {
        ReplicationPeerImpl removePeer = this.replicationPeers.removePeer(str);
        ArrayList<ReplicationSourceInterface> arrayList = new ArrayList();
        synchronized (this.oldsources) {
            for (ReplicationSourceInterface replicationSourceInterface : this.oldsources) {
                if (str.equals(replicationSourceInterface.getPeerId())) {
                    arrayList.add(replicationSourceInterface);
                }
            }
            for (ReplicationSourceInterface replicationSourceInterface2 : arrayList) {
                replicationSourceInterface2.terminate("Replication stream was removed by a user");
                removeRecoveredSource(replicationSourceInterface2);
            }
        }
        LOG.info("Number of deleted recovered sources for {}: {}", str, Integer.valueOf(arrayList.size()));
        ReplicationSourceInterface replicationSourceInterface3 = this.sources.get(str);
        if (replicationSourceInterface3 != null) {
            replicationSourceInterface3.terminate("Replication stream was removed by a user");
            removeSource(replicationSourceInterface3);
        }
        ReplicationPeerConfig peerConfig = removePeer.getPeerConfig();
        if (peerConfig.isSyncReplication()) {
            this.syncReplicationPeerMappingManager.remove(str, peerConfig);
        }
    }

    private ReplicationSourceInterface createSource(ReplicationQueueData replicationQueueData, ReplicationPeer replicationPeer) throws IOException {
        ReplicationSourceInterface create = ReplicationSourceFactory.create(this.conf, replicationQueueData.getId());
        create.init(this.conf, this.fs, this, this.queueStorage, replicationPeer, this.server, replicationQueueData, this.clusterId, this.walFactory.getWALProvider() != null ? this.walFactory.getWALProvider().getWALFileLengthProvider() : path -> {
            return OptionalLong.empty();
        }, new MetricsSource(replicationQueueData.getId().toString()));
        return create;
    }

    void addSource(String str, boolean z) throws IOException {
        ReplicationPeerImpl peer = this.replicationPeers.getPeer(str);
        if (ReplicationUtils.LEGACY_REGION_REPLICATION_ENDPOINT_NAME.equals(peer.getPeerConfig().getReplicationEndpointImpl())) {
            LOG.info("Legacy region replication peer found, skip adding: {}", peer.getPeerConfig());
            return;
        }
        ReplicationQueueId replicationQueueId = new ReplicationQueueId(this.server.getServerName(), str);
        ReplicationSourceInterface createSource = createSource(new ReplicationQueueData(replicationQueueId, ImmutableMap.of()), peer);
        synchronized (this.latestPaths) {
            this.sources.put(str, createSource);
            HashMap hashMap = new HashMap();
            this.walsById.put(replicationQueueId, hashMap);
            if (!this.latestPaths.isEmpty()) {
                for (Map.Entry<String, Path> entry : this.latestPaths.entrySet()) {
                    Path value = entry.getValue();
                    TreeSet treeSet = new TreeSet();
                    treeSet.add(value.getName());
                    hashMap.put(entry.getKey(), treeSet);
                    if (!z) {
                        abortAndThrowIOExceptionWhenFail(() -> {
                            this.queueStorage.setOffset(replicationQueueId, (String) entry.getKey(), new ReplicationGroupOffset(value.getName(), 0L), Collections.emptyMap());
                        });
                    }
                    createSource.enqueueLog(value);
                    LOG.trace("Enqueued {} to source {} during source creation.", value, createSource.getQueueId());
                }
            }
        }
        ReplicationPeerConfig peerConfig = peer.getPeerConfig();
        if (peerConfig.isSyncReplication()) {
            this.syncReplicationPeerMappingManager.add(peer.getId(), peerConfig);
        }
        createSource.startup();
    }

    public void drainSources(String str) throws IOException, ReplicationException {
        ReplicationQueueData replicationQueueData;
        ReplicationSourceInterface createSource;
        String str2 = "Sync replication peer " + str + " is transiting to STANDBY. Will close the previous replication source and open a new one";
        ReplicationPeerImpl peer = this.replicationPeers.getPeer(str);
        if (!$assertionsDisabled && !peer.getPeerConfig().isSyncReplication()) {
            throw new AssertionError();
        }
        ReplicationQueueId replicationQueueId = new ReplicationQueueId(this.server.getServerName(), str);
        createSource(new ReplicationQueueData(replicationQueueId, ImmutableMap.of()), peer);
        synchronized (this.latestPaths) {
            ImmutableMap.Builder builder = ImmutableMap.builder();
            synchronized (this.walsById) {
                this.walsById.get(replicationQueueId).forEach((str3, navigableSet) -> {
                    if (navigableSet.isEmpty()) {
                        return;
                    }
                    builder.put(str3, new ReplicationGroupOffset((String) navigableSet.last(), -1L));
                });
            }
            replicationQueueData = new ReplicationQueueData(replicationQueueId, builder.build());
            createSource = createSource(replicationQueueData, peer);
            ReplicationSourceInterface put = this.sources.put(str, createSource);
            if (put != null) {
                LOG.info("Terminate replication source for " + put.getPeerId());
                put.terminate(str2);
                put.getSourceMetrics().clear();
            }
        }
        UnmodifiableIterator<Map.Entry<String, ReplicationGroupOffset>> it = replicationQueueData.getOffsets().entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ReplicationGroupOffset> next = it.next();
            this.queueStorage.setOffset(replicationQueueId, next.getKey(), next.getValue(), Collections.emptyMap());
        }
        LOG.info("Startup replication source for " + createSource.getPeerId());
        createSource.startup();
        synchronized (this.walsById) {
            Map<String, NavigableSet<String>> map = this.walsById.get(replicationQueueId);
            replicationQueueData.getOffsets().forEach((str4, replicationGroupOffset) -> {
                NavigableSet navigableSet2 = (NavigableSet) map.get(str4);
                if (navigableSet2 != null) {
                    navigableSet2.headSet(replicationGroupOffset.getWal(), true).clear();
                }
            });
        }
        synchronized (this.oldsources) {
            Iterator<ReplicationSourceInterface> it2 = this.oldsources.iterator();
            while (it2.hasNext()) {
                ReplicationSourceInterface next2 = it2.next();
                if (next2.getPeerId().equals(str)) {
                    ReplicationQueueId queueId = next2.getQueueId();
                    next2.terminate(str2);
                    next2.getSourceMetrics().clear();
                    this.queueStorage.removeQueue(queueId);
                    this.walsByIdRecoveredQueues.remove(queueId);
                    it2.remove();
                }
            }
        }
    }

    private ReplicationSourceInterface createRefreshedSource(ReplicationQueueId replicationQueueId, ReplicationPeer replicationPeer) throws IOException, ReplicationException {
        return createSource(new ReplicationQueueData(replicationQueueId, ImmutableMap.copyOf((Map) this.queueStorage.getOffsets(replicationQueueId))), replicationPeer);
    }

    public void refreshSources(String str) throws ReplicationException, IOException {
        ReplicationSourceInterface createRefreshedSource;
        String str2 = "Peer " + str + " state or config changed. Will close the previous replication source and open a new one";
        ReplicationPeerImpl peer = this.replicationPeers.getPeer(str);
        ReplicationQueueId replicationQueueId = new ReplicationQueueId(this.server.getServerName(), str);
        synchronized (this.latestPaths) {
            ReplicationSourceInterface remove = this.sources.remove(str);
            if (remove != null) {
                LOG.info("Terminate replication source for " + remove.getPeerId());
                remove.terminate(str2, null, false);
            }
            createRefreshedSource = createRefreshedSource(replicationQueueId, peer);
            this.sources.put(str, createRefreshedSource);
            Iterator<NavigableSet<String>> it = this.walsById.get(replicationQueueId).values().iterator();
            while (it.hasNext()) {
                it.next().forEach(str3 -> {
                    createRefreshedSource.enqueueLog(new Path(this.logDir, str3));
                });
            }
        }
        LOG.info("Startup replication source for " + createRefreshedSource.getPeerId());
        createRefreshedSource.startup();
        ArrayList arrayList = new ArrayList();
        synchronized (this.oldsources) {
            ArrayList<ReplicationQueueId> arrayList2 = new ArrayList();
            Iterator<ReplicationSourceInterface> it2 = this.oldsources.iterator();
            while (it2.hasNext()) {
                ReplicationSourceInterface next = it2.next();
                if (next.getPeerId().equals(str)) {
                    arrayList2.add(next.getQueueId());
                    next.terminate(str2);
                    it2.remove();
                }
            }
            for (ReplicationQueueId replicationQueueId2 : arrayList2) {
                ReplicationSourceInterface createRefreshedSource2 = createRefreshedSource(replicationQueueId2, peer);
                this.oldsources.add(createRefreshedSource2);
                Iterator<NavigableSet<Path>> it3 = this.walsByIdRecoveredQueues.get(replicationQueueId2).values().iterator();
                while (it3.hasNext()) {
                    it3.next().forEach(path -> {
                        createRefreshedSource2.enqueueLog(path);
                    });
                }
                arrayList.add(createRefreshedSource2);
            }
        }
        Iterator it4 = arrayList.iterator();
        while (it4.hasNext()) {
            ((ReplicationSourceInterface) it4.next()).startup();
        }
    }

    private boolean removeRecoveredSource(ReplicationSourceInterface replicationSourceInterface) {
        if (!this.oldsources.remove(replicationSourceInterface)) {
            return false;
        }
        LOG.info("Done with the recovered queue {}", replicationSourceInterface.getQueueId());
        deleteQueue(replicationSourceInterface.getQueueId());
        this.walsByIdRecoveredQueues.remove(replicationSourceInterface.getQueueId());
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void finishRecoveredSource(ReplicationSourceInterface replicationSourceInterface) {
        synchronized (this.oldsources) {
            if (removeRecoveredSource(replicationSourceInterface)) {
                LOG.info("Finished recovering queue {} with the following stats: {}", replicationSourceInterface.getQueueId(), replicationSourceInterface.getStats());
            }
        }
    }

    void removeSource(ReplicationSourceInterface replicationSourceInterface) {
        LOG.info("Done with the queue " + replicationSourceInterface.getQueueId());
        this.sources.remove(replicationSourceInterface.getPeerId());
        deleteQueue(replicationSourceInterface.getQueueId());
        this.walsById.remove(replicationSourceInterface.getQueueId());
    }

    private void deleteQueue(ReplicationQueueId replicationQueueId) {
        abortWhenFail(() -> {
            this.queueStorage.removeQueue(replicationQueueId);
        });
    }

    private void interruptOrAbortWhenFail(ReplicationQueueOperation replicationQueueOperation) {
        try {
            replicationQueueOperation.exec();
        } catch (ReplicationException e) {
            if (e.getCause() != null && (e.getCause() instanceof KeeperException.SystemErrorException) && e.getCause().getCause() != null && (e.getCause().getCause() instanceof InterruptedException)) {
                throw new ReplicationRuntimeException("Thread is interrupted, the replication source may be terminated", e.getCause().getCause());
            }
            this.server.abort("Failed to operate on replication queue", e);
        }
    }

    private void abortWhenFail(ReplicationQueueOperation replicationQueueOperation) {
        try {
            replicationQueueOperation.exec();
        } catch (ReplicationException e) {
            this.server.abort("Failed to operate on replication queue", e);
        }
    }

    private void throwIOExceptionWhenFail(ReplicationQueueOperation replicationQueueOperation) throws IOException {
        try {
            replicationQueueOperation.exec();
        } catch (ReplicationException e) {
            throw new IOException(e);
        }
    }

    private void abortAndThrowIOExceptionWhenFail(ReplicationQueueOperation replicationQueueOperation) throws IOException {
        try {
            replicationQueueOperation.exec();
        } catch (ReplicationException e) {
            this.server.abort("Failed to operate on replication queue", e);
            throw new IOException(e);
        }
    }

    public void logPositionAndCleanOldLogs(ReplicationSourceInterface replicationSourceInterface, WALEntryBatch wALEntryBatch) {
        String name = wALEntryBatch.getLastWalPath().getName();
        String wALPrefixFromWALName = AbstractFSWALProvider.getWALPrefixFromWALName(name);
        ReplicationGroupOffset replicationGroupOffset = new ReplicationGroupOffset(name, wALEntryBatch.isEndOfFile() ? -1L : wALEntryBatch.getLastWalPosition());
        interruptOrAbortWhenFail(() -> {
            this.queueStorage.setOffset(replicationSourceInterface.getQueueId(), wALPrefixFromWALName, replicationGroupOffset, wALEntryBatch.getLastSeqIds());
        });
        cleanOldLogs(name, wALEntryBatch.isEndOfFile(), replicationSourceInterface);
    }

    void cleanOldLogs(String str, boolean z, ReplicationSourceInterface replicationSourceInterface) {
        String wALPrefixFromWALName = AbstractFSWALProvider.getWALPrefixFromWALName(str);
        if (replicationSourceInterface.isRecovered()) {
            NavigableSet<Path> navigableSet = this.walsByIdRecoveredQueues.get(replicationSourceInterface.getQueueId()).get(wALPrefixFromWALName);
            if (navigableSet != null) {
                NavigableSet<String> navigableSet2 = (NavigableSet) navigableSet.headSet(new Path(this.oldLogDir, str), z).stream().map((v0) -> {
                    return v0.getName();
                }).collect(Collectors.toCollection(TreeSet::new));
                if (navigableSet2.isEmpty()) {
                    return;
                }
                cleanOldLogs(navigableSet2, replicationSourceInterface);
                navigableSet2.clear();
                return;
            }
            return;
        }
        synchronized (this.walsById) {
            NavigableSet<String> navigableSet3 = this.walsById.get(replicationSourceInterface.getQueueId()).get(wALPrefixFromWALName);
            if (navigableSet3 == null) {
                return;
            }
            NavigableSet<String> headSet = navigableSet3.headSet(str, z);
            if (headSet.isEmpty()) {
                return;
            }
            TreeSet treeSet = new TreeSet((SortedSet) headSet);
            cleanOldLogs(treeSet, replicationSourceInterface);
            synchronized (this.walsById) {
                navigableSet3.removeAll(treeSet);
            }
        }
    }

    private void removeRemoteWALs(String str, String str2, Collection<String> collection) throws IOException {
        Path peerRemoteWALDir = ReplicationUtils.getPeerRemoteWALDir(str2, str);
        FileSystem remoteWALFileSystem = ReplicationUtils.getRemoteWALFileSystem(this.conf, str2);
        Iterator<String> it = collection.iterator();
        while (it.hasNext()) {
            Path path = new Path(peerRemoteWALDir, it.next());
            try {
            } catch (FileNotFoundException e) {
                LOG.debug("The remote wal {} has already been deleted?", path, e);
            }
            if (!remoteWALFileSystem.delete(path, false) && remoteWALFileSystem.exists(path)) {
                throw new IOException("Can not delete " + path);
                break;
            }
        }
    }

    private void cleanOldLogs(NavigableSet<String> navigableSet, ReplicationSourceInterface replicationSourceInterface) {
        LOG.debug("Removing {} logs in the list: {}", Integer.valueOf(navigableSet.size()), navigableSet);
        if (!replicationSourceInterface.isSyncReplication()) {
            return;
        }
        String peerId = replicationSourceInterface.getPeerId();
        String remoteWALDir = replicationSourceInterface.getPeer().getPeerConfig().getRemoteWALDir();
        List list = (List) navigableSet.stream().filter(str -> {
            Optional<String> syncReplicationPeerIdFromWALName = AbstractWALProvider.getSyncReplicationPeerIdFromWALName(str);
            peerId.getClass();
            return ((Boolean) syncReplicationPeerIdFromWALName.map((v1) -> {
                return r1.equals(v1);
            }).orElse(false)).booleanValue();
        }).collect(Collectors.toList());
        LOG.debug("Removing {} logs from remote dir {} in the list: {}", new Object[]{Integer.valueOf(list.size()), remoteWALDir, list});
        if (list.isEmpty()) {
            return;
        }
        int i = 0;
        while (true) {
            try {
                removeRemoteWALs(peerId, remoteWALDir, list);
                return;
            } catch (IOException e) {
                LOG.warn("Failed to delete remote wals from remote dir {} for peer {}", remoteWALDir, peerId);
                if (!replicationSourceInterface.isSourceActive()) {
                    return;
                }
                if (ReplicationUtils.sleepForRetries("Failed to delete remote wals", this.sleepForRetries, i, this.maxRetriesMultiplier)) {
                    i++;
                }
            }
        }
    }

    public void postLogRoll(Path path) throws IOException {
        String name = path.getName();
        String wALPrefixFromWALName = AbstractFSWALProvider.getWALPrefixFromWALName(name);
        synchronized (this.latestPaths) {
            synchronized (this.walsById) {
                for (Map.Entry<ReplicationQueueId, Map<String, NavigableSet<String>>> entry : this.walsById.entrySet()) {
                    String peerId = entry.getKey().getPeerId();
                    Map<String, NavigableSet<String>> value = entry.getValue();
                    boolean z = false;
                    for (Map.Entry<String, NavigableSet<String>> entry2 : value.entrySet()) {
                        NavigableSet<String> value2 = entry2.getValue();
                        if (this.sources.isEmpty()) {
                            value2.clear();
                        }
                        if (wALPrefixFromWALName.equals(entry2.getKey())) {
                            value2.add(name);
                            z = true;
                        }
                    }
                    if (!z) {
                        LOG.debug("Start tracking logs for wal group {} for peer {}", wALPrefixFromWALName, peerId);
                        TreeSet treeSet = new TreeSet();
                        treeSet.add(name);
                        value.put(wALPrefixFromWALName, treeSet);
                    }
                }
            }
            this.latestPaths.put(wALPrefixFromWALName, path);
        }
        for (ReplicationSourceInterface replicationSourceInterface : this.sources.values()) {
            replicationSourceInterface.enqueueLog(path);
            LOG.trace("Enqueued {} to source {} while performing postLogRoll operation.", path, replicationSourceInterface.getQueueId());
        }
    }

    private boolean shouldReplicate(ReplicationGroupOffset replicationGroupOffset, String str) {
        if (AbstractFSWALProvider.isMetaFile(str)) {
            return false;
        }
        return ReplicationOffsetUtil.shouldReplicate(replicationGroupOffset, str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void claimQueue(ReplicationQueueId replicationQueueId) {
        claimQueue(replicationQueueId, false);
    }

    private PriorityQueue<Path> getWALFilesToReplicate(ServerName serverName, boolean z, Map<String, ReplicationGroupOffset> map) throws IOException {
        List<Path> archivedWALFiles = AbstractFSWALProvider.getArchivedWALFiles(this.conf, serverName, URLEncoder.encode(serverName.toString(), StandardCharsets.UTF_8.name()));
        if (z) {
            archivedWALFiles.addAll(AbstractFSWALProvider.getWALFiles(this.conf, serverName));
        }
        PriorityQueue<Path> priorityQueue = new PriorityQueue<>((Comparator<? super Path>) AbstractFSWALProvider.TIMESTAMP_COMPARATOR);
        for (Path path : archivedWALFiles) {
            ReplicationGroupOffset replicationGroupOffset = map.get(AbstractFSWALProvider.getWALPrefixFromWALName(path.getName()));
            if (shouldReplicate(replicationGroupOffset, path.getName())) {
                priorityQueue.add(path);
            } else {
                LOG.debug("Skip enqueuing log {} because it is before the start offset {}", path.getName(), replicationGroupOffset);
            }
        }
        return priorityQueue;
    }

    private void addRecoveredSource(ReplicationSourceInterface replicationSourceInterface, ReplicationPeerImpl replicationPeerImpl, ReplicationQueueId replicationQueueId, PriorityQueue<Path> priorityQueue) {
        ReplicationPeerImpl peer = this.replicationPeers.getPeer(replicationSourceInterface.getPeerId());
        if (peer == null || peer != replicationPeerImpl) {
            replicationSourceInterface.terminate("Recovered queue doesn't belong to any current peer");
            deleteQueue(replicationQueueId);
            return;
        }
        if (peer.getPeerConfig().isSyncReplication()) {
            Pair<SyncReplicationState, SyncReplicationState> syncReplicationStateAndNewState = peer.getSyncReplicationStateAndNewState();
            if ((syncReplicationStateAndNewState.getFirst().equals(SyncReplicationState.STANDBY) && syncReplicationStateAndNewState.getSecond().equals(SyncReplicationState.NONE)) || syncReplicationStateAndNewState.getSecond().equals(SyncReplicationState.STANDBY)) {
                replicationSourceInterface.terminate("Sync replication peer is in STANDBY state");
                deleteQueue(replicationQueueId);
                return;
            }
        }
        HashMap hashMap = new HashMap();
        this.walsByIdRecoveredQueues.put(replicationQueueId, hashMap);
        Iterator<Path> it = priorityQueue.iterator();
        while (it.hasNext()) {
            Path next = it.next();
            String wALPrefixFromWALName = AbstractFSWALProvider.getWALPrefixFromWALName(next.getName());
            NavigableSet navigableSet = (NavigableSet) hashMap.get(wALPrefixFromWALName);
            if (navigableSet == null) {
                navigableSet = new TreeSet(AbstractFSWALProvider.TIMESTAMP_COMPARATOR);
                hashMap.put(wALPrefixFromWALName, navigableSet);
            }
            navigableSet.add(next);
        }
        this.oldsources.add(replicationSourceInterface);
        LOG.info("Added source for recovered queue {}, number of wals to replicate: {}", replicationQueueId, Integer.valueOf(priorityQueue.size()));
        Iterator<Path> it2 = priorityQueue.iterator();
        while (it2.hasNext()) {
            Path next2 = it2.next();
            LOG.debug("Enqueueing log {} from recovered queue for source: {}", next2, replicationQueueId);
            replicationSourceInterface.enqueueLog(next2);
        }
        replicationSourceInterface.startup();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void claimQueue(ReplicationQueueId replicationQueueId, boolean z) {
        try {
            Thread.sleep(this.sleepBeforeFailover + (ThreadLocalRandom.current().nextFloat() * ((float) this.sleepBeforeFailover)));
        } catch (InterruptedException e) {
            LOG.warn("Interrupted while waiting before transferring a queue.");
            Thread.currentThread().interrupt();
        }
        if (this.server.isStopped()) {
            LOG.info("Not transferring queue since we are shutting down");
            return;
        }
        String peerId = replicationQueueId.getPeerId();
        ReplicationPeerImpl peer = this.replicationPeers.getPeer(peerId);
        if (peer == null) {
            LOG.info("Not transferring queue since the replication peer {} for queue {} does not exist", peerId, replicationQueueId);
            return;
        }
        try {
            Map<String, ReplicationGroupOffset> claimQueue = this.queueStorage.claimQueue(replicationQueueId, this.server.getServerName());
            if (claimQueue.isEmpty()) {
                return;
            }
            ServerName serverWALsBelongTo = replicationQueueId.getServerWALsBelongTo();
            ReplicationQueueId claim = replicationQueueId.claim(this.server.getServerName());
            ReplicationPeerImpl peer2 = this.replicationPeers.getPeer(peerId);
            if (peer2 == null || peer2 != peer) {
                LOG.warn("Skipping failover for peer {} of node {}, peer is null", peerId, serverWALsBelongTo);
                deleteQueue(claim);
                return;
            }
            try {
                ReplicationSourceInterface createSource = createSource(new ReplicationQueueData(claim, ImmutableMap.copyOf((Map) claimQueue)), peer2);
                try {
                    PriorityQueue<Path> wALFilesToReplicate = getWALFilesToReplicate(serverWALsBelongTo, z, claimQueue);
                    synchronized (this.oldsources) {
                        addRecoveredSource(createSource, peer, claim, wALFilesToReplicate);
                    }
                } catch (IOException e2) {
                    LOG.error("Can not list wal files for peer {} and queue {}", new Object[]{peerId, replicationQueueId, e2});
                    this.server.abort("Can not list wal files after claiming queue.", e2);
                }
            } catch (IOException e3) {
                LOG.error("Can not create replication source for peer {} and queue {}", new Object[]{peerId, claim, e3});
                this.server.abort("Failed to create replication source after claiming queue.", e3);
            }
        } catch (ReplicationException e4) {
            LOG.error("ReplicationException: cannot claim dead region ({})'s replication queue", replicationQueueId.getServerName(), e4);
            this.server.abort("Failed to claim queue from dead regionserver.", e4);
        }
    }

    public void join() {
        this.executor.shutdown();
        Iterator<ReplicationSourceInterface> it = this.sources.values().iterator();
        while (it.hasNext()) {
            it.next().terminate("Region server is closing");
        }
        synchronized (this.oldsources) {
            Iterator<ReplicationSourceInterface> it2 = this.oldsources.iterator();
            while (it2.hasNext()) {
                it2.next().terminate("Region server is closing");
            }
        }
    }

    @RestrictedApi(explanation = "Should only be called in tests", link = "", allowedOnPath = ".*/src/test/.*")
    public Map<ReplicationQueueId, Map<String, NavigableSet<String>>> getWALs() {
        return Collections.unmodifiableMap(this.walsById);
    }

    public List<ReplicationSourceInterface> getSources() {
        return new ArrayList(this.sources.values());
    }

    public List<ReplicationSourceInterface> getOldSources() {
        return this.oldsources;
    }

    public ReplicationSourceInterface getSource(String str) {
        return this.sources.get(str);
    }

    int getSizeOfLatestPath() {
        int size;
        synchronized (this.latestPaths) {
            size = this.latestPaths.size();
        }
        return size;
    }

    Set<Path> getLastestPath() {
        HashSet newHashSet;
        synchronized (this.latestPaths) {
            newHashSet = Sets.newHashSet(this.latestPaths.values());
        }
        return newHashSet;
    }

    public long getTotalBufferUsed() {
        return this.totalBufferUsed.get();
    }

    public long getTotalBufferLimit() {
        return this.totalBufferLimit;
    }

    public Path getOldLogDir() {
        return this.oldLogDir;
    }

    public Path getLogDir() {
        return this.logDir;
    }

    public FileSystem getFs() {
        return this.fs;
    }

    public ReplicationPeers getReplicationPeers() {
        return this.replicationPeers;
    }

    public String getStats() {
        StringBuilder sb = new StringBuilder();
        sb.append("Global stats: ");
        sb.append("WAL Edits Buffer Used=").append(getTotalBufferUsed()).append("B, Limit=").append(getTotalBufferLimit()).append("B\n");
        for (ReplicationSourceInterface replicationSourceInterface : this.sources.values()) {
            sb.append("Normal source for cluster " + replicationSourceInterface.getPeerId() + ": ");
            sb.append(replicationSourceInterface.getStats() + "\n");
        }
        for (ReplicationSourceInterface replicationSourceInterface2 : this.oldsources) {
            sb.append("Recovered source for cluster/machine(s) " + replicationSourceInterface2.getPeerId() + ": ");
            sb.append(replicationSourceInterface2.getStats() + "\n");
        }
        return sb.toString();
    }

    public void addHFileRefs(TableName tableName, byte[] bArr, List<Pair<Path, Path>> list) throws IOException {
        for (ReplicationSourceInterface replicationSourceInterface : this.sources.values()) {
            throwIOExceptionWhenFail(() -> {
                replicationSourceInterface.addHFileRefs(tableName, bArr, list);
            });
        }
    }

    public void cleanUpHFileRefs(String str, List<String> list) {
        interruptOrAbortWhenFail(() -> {
            this.queueStorage.removeHFileRefs(str, list);
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public int activeFailoverTaskCount() {
        return this.executor.getActiveCount();
    }

    MetricsReplicationGlobalSourceSource getGlobalMetrics() {
        return this.globalMetrics;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReplicationQueueStorage getQueueStorage() {
        return this.queueStorage;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean acquireWALEntryBufferQuota(WALEntryBatch wALEntryBatch, WAL.Entry entry) {
        return acquireBufferQuota(wALEntryBatch.incrementUsedBufferSize(entry));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long releaseWALEntryBatchBufferQuota(WALEntryBatch wALEntryBatch) {
        long usedBufferSize = wALEntryBatch.getUsedBufferSize();
        if (usedBufferSize > 0) {
            releaseBufferQuota(usedBufferSize);
        }
        return usedBufferSize;
    }

    boolean acquireBufferQuota(long j) {
        if (j < 0) {
            throw new IllegalArgumentException("size should not less than 0");
        }
        return addTotalBufferUsed(j) >= this.totalBufferLimit;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void releaseBufferQuota(long j) {
        if (j < 0) {
            throw new IllegalArgumentException("size should not less than 0");
        }
        addTotalBufferUsed(-j);
    }

    private long addTotalBufferUsed(long j) {
        if (j == 0) {
            return this.totalBufferUsed.get();
        }
        long addAndGet = this.totalBufferUsed.addAndGet(j);
        this.globalMetrics.setWALReaderEditsBufferBytes(addAndGet);
        return addAndGet;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean checkBufferQuota(String str) {
        if (this.totalBufferUsed.get() <= this.totalBufferLimit) {
            return true;
        }
        LOG.warn("peer={}, can't read more edits from WAL as buffer usage {}B exceeds limit {}B", new Object[]{str, Long.valueOf(this.totalBufferUsed.get()), Long.valueOf(this.totalBufferLimit)});
        return false;
    }

    static {
        $assertionsDisabled = !ReplicationSourceManager.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(ReplicationSourceManager.class);
    }
}
