/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.Service;
import org.apache.hadoop.hbase.shaded.org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.ipc.RemoteException;

@InterfaceAudience.Private
public class HBaseInterClusterReplicationEndpoint
extends HBaseReplicationEndpoint {
    private static final Log LOG = LogFactory.getLog(HBaseInterClusterReplicationEndpoint.class);
    private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2L;
    private HConnection conn;
    private Configuration localConf;
    private Configuration conf;
    private long sleepForRetries;
    private int maxRetriesMultiplier;
    private int socketTimeoutMultiplier;
    private long maxTerminationWait;
    private int replicationRpcLimit;
    private MetricsSource metrics;
    private ReplicationSinkManager replicationSinkMgr;
    private boolean peersSelected = false;
    private String replicationClusterId = "";
    private ThreadPoolExecutor exec;
    private int maxThreads;
    private Path baseNamespaceDir;
    private Path hfileArchiveDir;
    private boolean replicationBulkLoadDataEnabled;
    private Abortable abortable;
    private boolean dropOnDeletedTables;

    @Override
    public void init(ReplicationEndpoint.Context context) throws IOException {
        super.init(context);
        this.conf = HBaseConfiguration.create(this.ctx.getConfiguration());
        this.localConf = HBaseConfiguration.create(this.ctx.getLocalConfiguration());
        this.decorateConf();
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.socketTimeoutMultiplier = this.conf.getInt("replication.source.socketTimeoutMultiplier", this.maxRetriesMultiplier);
        long maxTerminationWaitMultiplier = this.conf.getLong("replication.source.maxterminationmultiplier", 2L);
        this.maxTerminationWait = maxTerminationWaitMultiplier * this.conf.getLong("hbase.rpc.timeout", 60000L);
        this.conn = HConnectionManager.createConnection(this.conf);
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.metrics = context.getMetrics();
        this.replicationSinkMgr = new ReplicationSinkManager(this.conn, this.ctx.getPeerId(), this, this.conf);
        this.maxThreads = this.conf.getInt("hbase.replication.source.maxthreads", 10);
        this.exec = new ThreadPoolExecutor(this.maxThreads, this.maxThreads, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
        this.exec.allowCoreThreadTimeOut(true);
        this.abortable = this.ctx.getAbortable();
        this.replicationRpcLimit = (int)(0.95 * (double)this.conf.getLong("hbase.ipc.max.request.size", 0x10000000L));
        this.dropOnDeletedTables = this.conf.getBoolean("hbase.replication.drop.on.deleted.table", false);
        this.replicationBulkLoadDataEnabled = this.conf.getBoolean("hbase.replication.bulkload.enabled", false);
        if (this.replicationBulkLoadDataEnabled) {
            this.replicationClusterId = this.conf.get("hbase.replication.cluster.id");
        }
        Path rootDir = FSUtils.getRootDir(this.conf);
        Path baseNSDir = new Path("data");
        this.baseNamespaceDir = new Path(rootDir, baseNSDir);
        this.hfileArchiveDir = new Path(rootDir, new Path("archive", baseNSDir));
    }

    private void decorateConf() {
        String replicationCodec = this.conf.get("hbase.replication.rpc.codec");
        if (StringUtils.isNotEmpty(replicationCodec)) {
            this.conf.set("hbase.client.rpc.codec", replicationCodec);
        }
    }

    private void connectToPeers() {
        this.getRegionServers();
        int sleepMultiplier = 1;
        while (this.isRunning() && this.replicationSinkMgr.getNumSinks() == 0) {
            this.replicationSinkMgr.chooseSinks();
            if (!this.isRunning() || this.replicationSinkMgr.getNumSinks() != 0 || !this.sleepForRetries("Waiting for peers", sleepMultiplier)) continue;
            ++sleepMultiplier;
        }
    }

    protected boolean sleepForRetries(String msg, int sleepMultiplier) {
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)(msg + ", sleeping " + this.sleepForRetries + " times " + sleepMultiplier));
            }
            Thread.sleep(this.sleepForRetries * (long)sleepMultiplier);
        }
        catch (InterruptedException e) {
            LOG.debug((Object)"Interrupted while sleeping between retries");
        }
        return sleepMultiplier < this.maxRetriesMultiplier;
    }

    private List<List<WAL.Entry>> createBatches(List<WAL.Entry> entries) {
        int numSinks = Math.max(this.replicationSinkMgr.getNumSinks(), 1);
        int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
        HashMap entryMap = new HashMap(n);
        ArrayList<List<WAL.Entry>> entryLists = new ArrayList<List<WAL.Entry>>();
        int[] sizes = new int[n];
        for (int i = 0; i < n; ++i) {
            entryMap.put(i, new ArrayList(entries.size() / n + 1));
        }
        for (WAL.Entry e : entries) {
            int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n);
            int entrySize = (int)e.getKey().estimatedSerializedSizeOf() + (int)e.getEdit().estimatedSerializedSizeOf();
            if (sizes[index] > 0 && sizes[index] + entrySize > this.replicationRpcLimit) {
                entryLists.add((List<WAL.Entry>)entryMap.get(index));
                entryMap.put(index, new ArrayList());
                sizes[index] = 0;
            }
            ((List)entryMap.get(index)).add(e);
            int n2 = index;
            sizes[n2] = sizes[n2] + entrySize;
        }
        entryLists.addAll(entryMap.values());
        return entryLists;
    }

    private TableName parseTable(String msg) {
        Pattern p = Pattern.compile("TableNotFoundException: \\'([\\S]*)\\'");
        Matcher m = p.matcher(msg);
        if (m.find()) {
            String table = m.group(1);
            try {
                TableName.valueOf(TableName.isLegalFullyQualifiedTableName(Bytes.toBytes(table)));
                return TableName.valueOf(table);
            }
            catch (IllegalArgumentException illegalArgumentException) {
                // empty catch block
            }
        }
        return null;
    }

    private List<List<WAL.Entry>> filterBatches(List<List<WAL.Entry>> oldEntryList, TableName table) {
        ArrayList<List<WAL.Entry>> entryLists = new ArrayList<List<WAL.Entry>>();
        for (List<WAL.Entry> entries : oldEntryList) {
            ArrayList<WAL.Entry> thisList = new ArrayList<WAL.Entry>(entries.size());
            entryLists.add(thisList);
            for (WAL.Entry e : entries) {
                if (e.getKey().getTablename().equals(table)) continue;
                thisList.add(e);
            }
        }
        return entryLists;
    }

    private void reconnectToPeerCluster() {
        HConnection connection = null;
        try {
            connection = HConnectionManager.createConnection(this.conf);
        }
        catch (IOException ioe) {
            LOG.warn((Object)"Failed to create connection for peer cluster", (Throwable)ioe);
        }
        if (connection != null) {
            this.conn = connection;
        }
    }

    @Override
    public boolean replicate(ReplicationEndpoint.ReplicateContext replicateContext) {
        int numSinks;
        ExecutorCompletionService<Integer> pool = new ExecutorCompletionService<Integer>(this.exec);
        String walGroupId = replicateContext.getWalGroupId();
        int sleepMultiplier = 1;
        if (!this.peersSelected && this.isRunning()) {
            this.connectToPeers();
            this.peersSelected = true;
        }
        if ((numSinks = this.replicationSinkMgr.getNumSinks()) == 0) {
            LOG.warn((Object)"No replication sinks found, returning without replicating. The source should retry with the same set of edits.");
            return false;
        }
        List<List<WAL.Entry>> batches = this.createBatches(replicateContext.getEntries());
        while (this.isRunning() && !this.exec.isShutdown()) {
            if (!this.isPeerEnabled()) {
                if (!this.sleepForRetries("Replication is disabled", sleepMultiplier)) continue;
                ++sleepMultiplier;
                continue;
            }
            if (this.conn == null || this.conn.isClosed()) {
                this.reconnectToPeerCluster();
            }
            try {
                int futures = 0;
                for (int i = 0; i < batches.size(); ++i) {
                    List<WAL.Entry> entries = batches.get(i);
                    if (entries.isEmpty()) continue;
                    if (LOG.isTraceEnabled()) {
                        LOG.trace((Object)("Submitting " + entries.size() + " entries of total size " + replicateContext.getSize()));
                    }
                    pool.submit(this.createReplicator(entries, i));
                    ++futures;
                }
                IOException iox = null;
                long lastWriteTime = 0L;
                for (int i = 0; i < futures; ++i) {
                    try {
                        Future f = pool.take();
                        int index = (Integer)f.get();
                        List<WAL.Entry> batch = batches.get(index);
                        batches.set(index, Collections.emptyList());
                        long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
                        if (writeTime <= lastWriteTime) continue;
                        lastWriteTime = writeTime;
                        continue;
                    }
                    catch (InterruptedException ie) {
                        iox = new IOException(ie);
                        continue;
                    }
                    catch (ExecutionException ee) {
                        iox = (IOException)ee.getCause();
                    }
                }
                if (iox != null) {
                    throw iox;
                }
                if (lastWriteTime > 0L) {
                    this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
                }
                return true;
            }
            catch (IOException ioe) {
                this.metrics.refreshAgeOfLastShippedOp(walGroupId);
                if (ioe instanceof RemoteException) {
                    ioe = ((RemoteException)ioe).unwrapRemoteException();
                    LOG.warn((Object)"Can't replicate because of an error on the remote cluster: ", (Throwable)ioe);
                    if (ioe instanceof TableNotFoundException) {
                        TableName table;
                        if (this.dropOnDeletedTables && (table = this.parseTable(ioe.getMessage())) != null) {
                            try {
                                Connection localConn = ConnectionFactory.createConnection(this.ctx.getLocalConfiguration());
                                Throwable throwable = null;
                                try {
                                    if (!localConn.getAdmin().tableExists(table)) {
                                        LOG.info((Object)("Missing table detected at sink, local table also does not exist, filtering edits for '" + table + "'"));
                                        batches = this.filterBatches(batches, table);
                                        continue;
                                    }
                                }
                                catch (Throwable throwable2) {
                                    throwable = throwable2;
                                    throw throwable2;
                                }
                                finally {
                                    if (localConn == null) continue;
                                    if (throwable != null) {
                                        try {
                                            localConn.close();
                                        }
                                        catch (Throwable x2) {
                                            throwable.addSuppressed(x2);
                                        }
                                        continue;
                                    }
                                    localConn.close();
                                    continue;
                                }
                            }
                            catch (IOException iox) {
                                LOG.warn((Object)"Exception checking for local table: ", (Throwable)iox);
                            }
                        }
                    } else {
                        LOG.warn((Object)"Peer encountered RemoteException, rechecking all sinks: ", (Throwable)ioe);
                        this.replicationSinkMgr.chooseSinks();
                    }
                } else if (ioe instanceof SocketTimeoutException) {
                    this.sleepForRetries("Encountered a SocketTimeoutException. Since the call to the remote cluster timed out, which is usually caused by a machine failure or a massive slowdown", this.socketTimeoutMultiplier);
                } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
                    LOG.warn((Object)"Peer is unavailable, rechecking all sinks: ", (Throwable)ioe);
                    this.replicationSinkMgr.chooseSinks();
                } else {
                    LOG.warn((Object)"Can't replicate because of a local or network error: ", (Throwable)ioe);
                }
                if (!this.sleepForRetries("Since we are unable to replicate", sleepMultiplier)) continue;
                ++sleepMultiplier;
            }
        }
        return false;
    }

    protected boolean isPeerEnabled() {
        return this.ctx.getReplicationPeer().getPeerState() == ReplicationPeer.PeerState.ENABLED;
    }

    @Override
    protected void doStop() {
        this.disconnect();
        if (this.conn != null) {
            try {
                this.conn.close();
                this.conn = null;
            }
            catch (IOException e) {
                LOG.warn((Object)"Failed to close the connection");
            }
        }
        this.exec.shutdown();
        try {
            this.exec.awaitTermination(this.maxTerminationWait, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            // empty catch block
        }
        if (!this.exec.isTerminated()) {
            String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The ThreadPoolExecutor failed to finish all tasks within " + this.maxTerminationWait + "ms. " + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
            this.abortable.abort(errMsg, new IOException(errMsg));
        }
        this.close();
        this.notifyStopped();
    }

    @Override
    public Service.State stopAndWait() {
        this.doStop();
        return super.stopAndWait();
    }

    protected Replicator createReplicator(List<WAL.Entry> entries, int ordinal) {
        return new Replicator(entries, ordinal);
    }

    protected class Replicator
    implements Callable<Integer> {
        private List<WAL.Entry> entries;
        private int ordinal;

        public Replicator(List<WAL.Entry> entries, int ordinal) {
            this.entries = entries;
            this.ordinal = ordinal;
        }

        protected void replicateEntries(AdminProtos.AdminService.BlockingInterface rrs, List<WAL.Entry> batch, String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir) throws IOException {
            if (LOG.isTraceEnabled()) {
                long size = 0L;
                for (WAL.Entry e : this.entries) {
                    size += e.getKey().estimatedSerializedSizeOf();
                    size += e.getEdit().estimatedSerializedSizeOf();
                }
                LOG.trace((Object)("Replicating batch " + System.identityHashCode(this.entries) + " of " + this.entries.size() + " entries with total size " + size + " bytes to " + replicationClusterId));
            }
            try {
                ReplicationProtbufUtil.replicateWALEntry(rrs, batch.toArray(new WAL.Entry[batch.size()]), replicationClusterId, baseNamespaceDir, hfileArchiveDir);
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)("Completed replicating batch " + System.identityHashCode(this.entries)));
                }
            }
            catch (IOException e) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)("Failed replicating batch " + System.identityHashCode(this.entries)), (Throwable)e);
                }
                throw e;
            }
        }

        @Override
        public Integer call() throws IOException {
            ReplicationSinkManager.SinkPeer sinkPeer = null;
            try {
                sinkPeer = HBaseInterClusterReplicationEndpoint.this.replicationSinkMgr.getReplicationSink();
                AdminProtos.AdminService.BlockingInterface rrs = sinkPeer.getRegionServer();
                this.replicateEntries(rrs, this.entries, HBaseInterClusterReplicationEndpoint.this.replicationClusterId, HBaseInterClusterReplicationEndpoint.this.baseNamespaceDir, HBaseInterClusterReplicationEndpoint.this.hfileArchiveDir);
                HBaseInterClusterReplicationEndpoint.this.replicationSinkMgr.reportSinkSuccess(sinkPeer);
                return this.ordinal;
            }
            catch (IOException ioe) {
                if (sinkPeer != null) {
                    HBaseInterClusterReplicationEndpoint.this.replicationSinkMgr.reportBadSink(sinkPeer);
                }
                throw ioe;
            }
        }
    }
}

