package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager;
import org.apache.hadoop.hbase.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.class */
public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoint {
    private static final Logger LOG = LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
    private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
    private ClusterConnection conn;
    private Configuration localConf;
    private Configuration conf;
    private long sleepForRetries;
    private int maxRetriesMultiplier;
    private int socketTimeoutMultiplier;
    private long maxTerminationWait;
    private int replicationRpcLimit;
    private MetricsSource metrics;
    private ReplicationSinkManager replicationSinkMgr;
    private boolean peersSelected = false;
    private String replicationClusterId = "";
    private ThreadPoolExecutor exec;
    private int maxThreads;
    private Path baseNamespaceDir;
    private Path hfileArchiveDir;
    private boolean replicationBulkLoadDataEnabled;
    private Abortable abortable;
    private boolean dropOnDeletedTables;

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint$Replicator.class */
    public class Replicator implements Callable<Integer> {
        private List<WAL.Entry> entries;
        private int ordinal;

        public Replicator(List<WAL.Entry> list, int i) {
            this.entries = list;
            this.ordinal = i;
        }

        protected void replicateEntries(AdminProtos.AdminService.BlockingInterface blockingInterface, List<WAL.Entry> list, String str, Path path, Path path2) throws IOException {
            if (HBaseInterClusterReplicationEndpoint.LOG.isTraceEnabled()) {
                long j = 0;
                for (WAL.Entry entry : this.entries) {
                    j = j + entry.getKey().estimatedSerializedSizeOf() + entry.getEdit().estimatedSerializedSizeOf();
                }
                HBaseInterClusterReplicationEndpoint.LOG.trace("Replicating batch " + System.identityHashCode(this.entries) + " of " + this.entries.size() + " entries with total size " + j + " bytes to " + str);
            }
            try {
                ReplicationProtbufUtil.replicateWALEntry(blockingInterface, (WAL.Entry[]) list.toArray(new WAL.Entry[list.size()]), str, path, path2);
                if (HBaseInterClusterReplicationEndpoint.LOG.isTraceEnabled()) {
                    HBaseInterClusterReplicationEndpoint.LOG.trace("Completed replicating batch " + System.identityHashCode(this.entries));
                }
            } catch (IOException e) {
                if (HBaseInterClusterReplicationEndpoint.LOG.isTraceEnabled()) {
                    HBaseInterClusterReplicationEndpoint.LOG.trace("Failed replicating batch " + System.identityHashCode(this.entries), e);
                }
                throw e;
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Integer call() throws IOException {
            ReplicationSinkManager.SinkPeer sinkPeer = null;
            try {
                sinkPeer = HBaseInterClusterReplicationEndpoint.this.replicationSinkMgr.getReplicationSink();
                replicateEntries(sinkPeer.getRegionServer(), this.entries, HBaseInterClusterReplicationEndpoint.this.replicationClusterId, HBaseInterClusterReplicationEndpoint.this.baseNamespaceDir, HBaseInterClusterReplicationEndpoint.this.hfileArchiveDir);
                HBaseInterClusterReplicationEndpoint.this.replicationSinkMgr.reportSinkSuccess(sinkPeer);
                return Integer.valueOf(this.ordinal);
            } catch (IOException e) {
                if (sinkPeer != null) {
                    HBaseInterClusterReplicationEndpoint.this.replicationSinkMgr.reportBadSink(sinkPeer);
                }
                throw e;
            }
        }
    }

    @Override // org.apache.hadoop.hbase.replication.BaseReplicationEndpoint, org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public void init(ReplicationEndpoint.Context context) throws IOException {
        super.init(context);
        this.conf = HBaseConfiguration.create(this.ctx.getConfiguration());
        this.localConf = HBaseConfiguration.create(this.ctx.getLocalConfiguration());
        decorateConf();
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", this.maxRetriesMultiplier);
        this.maxTerminationWait = this.conf.getLong("replication.source.maxterminationmultiplier", 2L) * this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000L);
        this.conn = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.metrics = context.getMetrics();
        this.replicationSinkMgr = new ReplicationSinkManager(this.conn, this.ctx.getPeerId(), this, this.conf);
        this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 10);
        this.exec = new ThreadPoolExecutor(this.maxThreads, this.maxThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue());
        this.exec.allowCoreThreadTimeOut(true);
        this.abortable = this.ctx.getAbortable();
        this.replicationRpcLimit = (int) (0.95d * this.conf.getLong(RpcServer.MAX_REQUEST_SIZE, 268435456L));
        this.dropOnDeletedTables = this.conf.getBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
        this.replicationBulkLoadDataEnabled = this.conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, false);
        if (this.replicationBulkLoadDataEnabled) {
            this.replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
        }
        Path rootDir = FSUtils.getRootDir(this.conf);
        Path path = new Path("data");
        this.baseNamespaceDir = new Path(rootDir, path);
        this.hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, path));
    }

    private void decorateConf() {
        String str = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
        if (StringUtils.isNotEmpty(str)) {
            this.conf.set(HConstants.RPC_CODEC_CONF_KEY, str);
        }
    }

    private void connectToPeers() {
        getRegionServers();
        int i = 1;
        while (isRunning() && this.replicationSinkMgr.getNumSinks() == 0) {
            this.replicationSinkMgr.chooseSinks();
            if (isRunning() && this.replicationSinkMgr.getNumSinks() == 0 && sleepForRetries("Waiting for peers", i)) {
                i++;
            }
        }
    }

    protected boolean sleepForRetries(String str, int i) {
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace(str + ", sleeping " + this.sleepForRetries + " times " + i);
            }
            Thread.sleep(this.sleepForRetries * i);
        } catch (InterruptedException e) {
            LOG.debug("Interrupted while sleeping between retries");
        }
        return i < this.maxRetriesMultiplier;
    }

    private List<List<WAL.Entry>> createBatches(List<WAL.Entry> list) {
        int min = Math.min(Math.min(this.maxThreads, (list.size() / 100) + 1), Math.max(this.replicationSinkMgr.getNumSinks(), 1));
        HashMap hashMap = new HashMap(min);
        ArrayList arrayList = new ArrayList();
        int[] iArr = new int[min];
        for (int i = 0; i < min; i++) {
            hashMap.put(Integer.valueOf(i), new ArrayList((list.size() / min) + 1));
        }
        for (WAL.Entry entry : list) {
            int abs = Math.abs(Bytes.hashCode(entry.getKey().getEncodedRegionName()) % min);
            int estimatedSerializedSizeOf = ((int) entry.getKey().estimatedSerializedSizeOf()) + ((int) entry.getEdit().estimatedSerializedSizeOf());
            if (iArr[abs] > 0 && iArr[abs] + estimatedSerializedSizeOf > this.replicationRpcLimit) {
                arrayList.add(hashMap.get(Integer.valueOf(abs)));
                hashMap.put(Integer.valueOf(abs), new ArrayList());
                iArr[abs] = 0;
            }
            ((List) hashMap.get(Integer.valueOf(abs))).add(entry);
            iArr[abs] = iArr[abs] + estimatedSerializedSizeOf;
        }
        arrayList.addAll(hashMap.values());
        return arrayList;
    }

    private TableName parseTable(String str) {
        Matcher matcher = Pattern.compile("TableNotFoundException: \\'([\\S]*)\\'").matcher(str);
        if (!matcher.find()) {
            return null;
        }
        String group = matcher.group(1);
        try {
            TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(group)));
            return TableName.valueOf(group);
        } catch (IllegalArgumentException e) {
            return null;
        }
    }

    private List<List<WAL.Entry>> filterBatches(List<List<WAL.Entry>> list, TableName tableName) {
        ArrayList arrayList = new ArrayList();
        for (List<WAL.Entry> list2 : list) {
            ArrayList arrayList2 = new ArrayList(list2.size());
            arrayList.add(arrayList2);
            for (WAL.Entry entry : list2) {
                if (!entry.getKey().getTableName().equals(tableName)) {
                    arrayList2.add(entry);
                }
            }
        }
        return arrayList;
    }

    private void reconnectToPeerCluster() {
        ClusterConnection clusterConnection = null;
        try {
            clusterConnection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
        } catch (IOException e) {
            LOG.warn("Failed to create connection for peer cluster", e);
        }
        if (clusterConnection != null) {
            this.conn = clusterConnection;
        }
    }

    @Override // org.apache.hadoop.hbase.replication.ReplicationEndpoint
    public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
        TableName parseTable;
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(this.exec);
        String walGroupId = replicateContext.getWalGroupId();
        int i = 1;
        if (!this.peersSelected && isRunning()) {
            connectToPeers();
            this.peersSelected = true;
        }
        if (this.replicationSinkMgr.getNumSinks() == 0) {
            LOG.warn("No replication sinks found, returning without replicating. The source should retry with the same set of edits.");
            return false;
        }
        List<List<WAL.Entry>> createBatches = createBatches(replicateContext.getEntries());
        while (isRunning() && !this.exec.isShutdown()) {
            if (isPeerEnabled()) {
                if (this.conn == null || this.conn.isClosed()) {
                    reconnectToPeerCluster();
                }
                int i2 = 0;
                for (int i3 = 0; i3 < createBatches.size(); i3++) {
                    try {
                        List<WAL.Entry> list = createBatches.get(i3);
                        if (!list.isEmpty()) {
                            if (LOG.isTraceEnabled()) {
                                LOG.trace("Submitting " + list.size() + " entries of total size " + replicateContext.getSize());
                            }
                            executorCompletionService.submit(createReplicator(list, i3));
                            i2++;
                        }
                    } catch (IOException e) {
                        this.metrics.refreshAgeOfLastShippedOp(walGroupId);
                        if (e instanceof RemoteException) {
                            IOException unwrapRemoteException = ((RemoteException) e).unwrapRemoteException();
                            LOG.warn("Can't replicate because of an error on the remote cluster: ", unwrapRemoteException);
                            if (!(unwrapRemoteException instanceof TableNotFoundException)) {
                                LOG.warn("Peer encountered RemoteException, rechecking all sinks: ", unwrapRemoteException);
                                this.replicationSinkMgr.chooseSinks();
                            } else if (this.dropOnDeletedTables && (parseTable = parseTable(unwrapRemoteException.getMessage())) != null) {
                                try {
                                    Connection createConnection = ConnectionFactory.createConnection(this.ctx.getLocalConfiguration());
                                    Throwable th = null;
                                    try {
                                        try {
                                            if (!createConnection.getAdmin().tableExists(parseTable)) {
                                                LOG.info("Missing table detected at sink, local table also does not exist, filtering edits for '" + parseTable + "'");
                                                createBatches = filterBatches(createBatches, parseTable);
                                                if (createConnection != null) {
                                                    if (0 != 0) {
                                                        try {
                                                            createConnection.close();
                                                        } catch (Throwable th2) {
                                                            th.addSuppressed(th2);
                                                        }
                                                    } else {
                                                        createConnection.close();
                                                    }
                                                }
                                            } else if (createConnection != null) {
                                                if (0 != 0) {
                                                    try {
                                                        createConnection.close();
                                                    } catch (Throwable th3) {
                                                        th.addSuppressed(th3);
                                                    }
                                                } else {
                                                    createConnection.close();
                                                }
                                            }
                                        } finally {
                                        }
                                    } catch (Throwable th4) {
                                        th = th4;
                                        throw th4;
                                        break;
                                    }
                                } catch (IOException e2) {
                                    LOG.warn("Exception checking for local table: ", e2);
                                }
                                LOG.warn("Exception checking for local table: ", e2);
                            }
                        } else if (e instanceof SocketTimeoutException) {
                            sleepForRetries("Encountered a SocketTimeoutException. Since the call to the remote cluster timed out, which is usually caused by a machine failure or a massive slowdown", this.socketTimeoutMultiplier);
                        } else if ((e instanceof ConnectException) || (e instanceof UnknownHostException)) {
                            LOG.warn("Peer is unavailable, rechecking all sinks: ", e);
                            this.replicationSinkMgr.chooseSinks();
                        } else {
                            LOG.warn("Can't replicate because of a local or network error: ", e);
                        }
                        if (sleepForRetries("Since we are unable to replicate", i)) {
                            i++;
                        }
                    }
                }
                IOException iOException = null;
                long j = 0;
                for (int i4 = 0; i4 < i2; i4++) {
                    try {
                        int intValue = ((Integer) executorCompletionService.take().get()).intValue();
                        List<WAL.Entry> list2 = createBatches.get(intValue);
                        createBatches.set(intValue, Collections.emptyList());
                        long writeTime = list2.get(list2.size() - 1).getKey().getWriteTime();
                        if (writeTime > j) {
                            j = writeTime;
                        }
                    } catch (InterruptedException e3) {
                        iOException = new IOException(e3);
                    } catch (ExecutionException e4) {
                        iOException = (IOException) e4.getCause();
                    }
                }
                if (iOException != null) {
                    throw iOException;
                }
                if (j <= 0) {
                    return true;
                }
                this.metrics.setAgeOfLastShippedOp(j, walGroupId);
                return true;
            }
            if (sleepForRetries("Replication is disabled", i)) {
                i++;
            }
        }
        return false;
    }

    protected boolean isPeerEnabled() {
        return this.ctx.getReplicationPeer().getPeerState() == ReplicationPeer.PeerState.ENABLED;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint, org.apache.hbase.thirdparty.com.google.common.util.concurrent.AbstractService
    public void doStop() {
        disconnect();
        if (this.conn != null) {
            try {
                this.conn.close();
                this.conn = null;
            } catch (IOException e) {
                LOG.warn("Failed to close the connection");
            }
        }
        this.exec.shutdown();
        try {
            this.exec.awaitTermination(this.maxTerminationWait, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e2) {
        }
        if (!this.exec.isTerminated()) {
            String str = "HBaseInterClusterReplicationEndpoint termination failed. The ThreadPoolExecutor failed to finish all tasks within " + this.maxTerminationWait + "ms. Aborting to prevent Replication from deadlocking. See HBASE-16081.";
            this.abortable.abort(str, new IOException(str));
        }
        notifyStopped();
    }

    @VisibleForTesting
    protected Replicator createReplicator(List<WAL.Entry> list, int i) {
        return new Replicator(list, i);
    }
}
