package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.shaded.org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ProtocolStringList;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.class */
public class ReplicationSink {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationSink.class);
    private final Configuration conf;
    private volatile AsyncClusterConnection sharedConn;
    private final MetricsSink metrics;
    private final AtomicLong totalReplicatedEdits = new AtomicLong();
    private final Object sharedConnLock = new Object();
    private long hfilesReplicated = 0;
    private SourceFSConfigurationProvider provider;
    private WALEntrySinkFilter walEntrySinkFilter;
    private final int rowSizeWarnThreshold;

    public ReplicationSink(Configuration configuration) throws IOException {
        this.conf = HBaseConfiguration.create(configuration);
        this.rowSizeWarnThreshold = configuration.getInt(HConstants.BATCH_ROWS_THRESHOLD_NAME, 5000);
        decorateConf();
        this.metrics = new MetricsSink();
        this.walEntrySinkFilter = setupWALEntrySinkFilter();
        String str = configuration.get("hbase.replication.source.fs.conf.provider", DefaultSourceFSConfigurationProvider.class.getCanonicalName());
        try {
            this.provider = (SourceFSConfigurationProvider) Class.forName(str).asSubclass(SourceFSConfigurationProvider.class).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
        } catch (Exception e) {
            throw new IllegalArgumentException("Configured source fs configuration provider class " + str + " throws error.", e);
        }
    }

    private WALEntrySinkFilter setupWALEntrySinkFilter() throws IOException {
        WALEntrySinkFilter wALEntrySinkFilter;
        Class cls = this.conf.getClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, (Class) null);
        WALEntrySinkFilter wALEntrySinkFilter2 = null;
        if (cls == null) {
            wALEntrySinkFilter = null;
        } else {
            try {
                wALEntrySinkFilter = (WALEntrySinkFilter) cls.getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            } catch (Exception e) {
                LOG.warn("Failed to instantiate " + cls);
            }
        }
        wALEntrySinkFilter2 = wALEntrySinkFilter;
        if (wALEntrySinkFilter2 != null) {
            wALEntrySinkFilter2.init(getConnection());
        }
        return wALEntrySinkFilter2;
    }

    private void decorateConf() {
        this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, this.conf.getInt("replication.sink.client.retries.number", 4));
        this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, this.conf.getInt("replication.sink.client.ops.timeout", 10000));
        String str = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
        if (StringUtils.isNotEmpty(str)) {
            this.conf.set(HConstants.RPC_CODEC_CONF_KEY, str);
        }
        if (this.conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
            this.conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
        }
    }

    public void replicateEntries(List<AdminProtos.WALEntry> list, CellScanner cellScanner, String str, String str2, String str3) throws IOException {
        if (list.isEmpty()) {
            return;
        }
        try {
            long j = 0;
            TreeMap treeMap = new TreeMap();
            HashMap hashMap = null;
            for (AdminProtos.WALEntry wALEntry : list) {
                TableName valueOf = TableName.valueOf(wALEntry.getKey().getTableName().toByteArray());
                if (this.walEntrySinkFilter == null || !this.walEntrySinkFilter.filter(valueOf, wALEntry.getKey().getWriteTime())) {
                    Cell cell = null;
                    Mutation mutation = null;
                    int associatedCellCount = wALEntry.getAssociatedCellCount();
                    for (int i = 0; i < associatedCellCount; i++) {
                        if (!cellScanner.advance()) {
                            throw new ArrayIndexOutOfBoundsException("Expected=" + associatedCellCount + ", index=" + i);
                        }
                        Cell current = cellScanner.current();
                        if (CellUtil.matchingQualifier(current, WALEdit.BULK_LOAD)) {
                            WALProtos.BulkLoadDescriptor bulkLoadDescriptor = WALEdit.getBulkLoadDescriptor(current);
                            if (bulkLoadDescriptor.getReplicate()) {
                                if (hashMap == null) {
                                    hashMap = new HashMap();
                                }
                                buildBulkLoadHFileMap((Map) hashMap.computeIfAbsent(bulkLoadDescriptor.getClusterIdsList(), list2 -> {
                                    return new HashMap();
                                }), valueOf, bulkLoadDescriptor);
                            }
                        } else {
                            if (isNewRowOrType(cell, current)) {
                                mutation = CellUtil.isDelete(current) ? new Delete(current.getRowArray(), current.getRowOffset(), current.getRowLength()) : new Put(current.getRowArray(), current.getRowOffset(), current.getRowLength());
                                ArrayList arrayList = new ArrayList(wALEntry.getKey().getClusterIdsList().size());
                                Iterator<HBaseProtos.UUID> it = wALEntry.getKey().getClusterIdsList().iterator();
                                while (it.hasNext()) {
                                    arrayList.add(toUUID(it.next()));
                                }
                                mutation.setClusterIds(arrayList);
                                mutation.setAttribute(ReplicationUtils.REPLICATION_ATTR_NAME, HConstants.EMPTY_BYTE_ARRAY);
                                addToHashMultiMap(treeMap, valueOf, arrayList, mutation);
                            }
                            if (CellUtil.isDelete(current)) {
                                ((Delete) mutation).add(current);
                            } else {
                                ((Put) mutation).add(current);
                            }
                            cell = current;
                        }
                    }
                    j++;
                } else {
                    int associatedCellCount2 = wALEntry.getAssociatedCellCount();
                    for (int i2 = 0; i2 < associatedCellCount2; i2++) {
                        if (!cellScanner.advance()) {
                            throw new ArrayIndexOutOfBoundsException("Expected=" + associatedCellCount2 + ", index=" + i2);
                        }
                    }
                }
            }
            if (!treeMap.isEmpty()) {
                LOG.debug("Started replicating mutations.");
                for (Map.Entry entry : treeMap.entrySet()) {
                    batch((TableName) entry.getKey(), ((Map) entry.getValue()).values(), this.rowSizeWarnThreshold);
                }
                LOG.debug("Finished replicating mutations.");
            }
            if (hashMap != null) {
                for (Map.Entry entry2 : hashMap.entrySet()) {
                    Map map = (Map) entry2.getValue();
                    if (map != null && !map.isEmpty()) {
                        LOG.debug("Replicating {} bulk loaded data", ((List) entry2.getKey()).toString());
                        HFileReplicator hFileReplicator = new HFileReplicator(this.provider.getConf(this.conf, str), str2, str3, map, this.conf, getConnection(), (List) entry2.getKey());
                        Throwable th = null;
                        try {
                            try {
                                hFileReplicator.replicate();
                                LOG.debug("Finished replicating {} bulk loaded data", ((List) entry2.getKey()).toString());
                                if (hFileReplicator != null) {
                                    if (0 != 0) {
                                        try {
                                            hFileReplicator.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        hFileReplicator.close();
                                    }
                                }
                            } finally {
                            }
                        } finally {
                        }
                    }
                }
            }
            int size = list.size();
            this.metrics.setAgeOfLastAppliedOp(list.get(size - 1).getKey().getWriteTime());
            this.metrics.applyBatch(size + this.hfilesReplicated, this.hfilesReplicated);
            this.totalReplicatedEdits.addAndGet(j);
        } catch (IOException e) {
            LOG.error("Unable to accept edit because:", e);
            throw e;
        }
    }

    private void buildBulkLoadHFileMap(Map<String, List<Pair<byte[], List<String>>>> map, TableName tableName, WALProtos.BulkLoadDescriptor bulkLoadDescriptor) throws IOException {
        List<WALProtos.StoreDescriptor> storesList = bulkLoadDescriptor.getStoresList();
        int size = storesList.size();
        for (int i = 0; i < size; i++) {
            WALProtos.StoreDescriptor storeDescriptor = storesList.get(i);
            ProtocolStringList storeFileList = storeDescriptor.getStoreFileList();
            int size2 = storeFileList.size();
            this.hfilesReplicated += size2;
            for (int i2 = 0; i2 < size2; i2++) {
                byte[] byteArray = storeDescriptor.getFamilyName().toByteArray();
                String hFilePath = getHFilePath(tableName, bulkLoadDescriptor, storeFileList.get(i2), byteArray);
                String nameWithNamespaceInclAsString = tableName.getNameWithNamespaceInclAsString();
                List<Pair<byte[], List<String>>> list = map.get(nameWithNamespaceInclAsString);
                if (list != null) {
                    boolean z = false;
                    Iterator<Pair<byte[], List<String>>> it = list.iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        Pair<byte[], List<String>> next = it.next();
                        if (Bytes.equals(next.getFirst(), byteArray)) {
                            next.getSecond().add(hFilePath);
                            z = true;
                            break;
                        }
                    }
                    if (!z) {
                        addFamilyAndItsHFilePathToTableInMap(byteArray, hFilePath, list);
                    }
                } else {
                    addNewTableEntryInMap(map, byteArray, hFilePath, nameWithNamespaceInclAsString);
                }
            }
        }
    }

    private void addFamilyAndItsHFilePathToTableInMap(byte[] bArr, String str, List<Pair<byte[], List<String>>> list) {
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(str);
        list.add(new Pair<>(bArr, arrayList));
    }

    private void addNewTableEntryInMap(Map<String, List<Pair<byte[], List<String>>>> map, byte[] bArr, String str, String str2) {
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(str);
        Pair<byte[], List<String>> pair = new Pair<>(bArr, arrayList);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(pair);
        map.put(str2, arrayList2);
    }

    private String getHFilePath(TableName tableName, WALProtos.BulkLoadDescriptor bulkLoadDescriptor, String str, byte[] bArr) {
        return new StringBuilder(100).append(tableName.getNamespaceAsString()).append("/").append(tableName.getQualifierAsString()).append("/").append(Bytes.toString(bulkLoadDescriptor.getEncodedRegionName().toByteArray())).append("/").append(Bytes.toString(bArr)).append("/").append(str).toString();
    }

    private boolean isNewRowOrType(Cell cell, Cell cell2) {
        return (cell != null && cell.getTypeByte() == cell2.getTypeByte() && CellUtil.matchingRows(cell, cell2)) ? false : true;
    }

    private UUID toUUID(HBaseProtos.UUID uuid) {
        return new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
    }

    private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2, List<V>>> map, K1 k1, K2 k2, V v) {
        List<V> computeIfAbsent = map.computeIfAbsent(k1, obj -> {
            return new HashMap();
        }).computeIfAbsent(k2, obj2 -> {
            return new ArrayList();
        });
        computeIfAbsent.add(v);
        return computeIfAbsent;
    }

    public void stopReplicationSinkServices() {
        try {
            if (this.sharedConn != null) {
                synchronized (this.sharedConnLock) {
                    if (this.sharedConn != null) {
                        this.sharedConn.close();
                        this.sharedConn = null;
                    }
                }
            }
        } catch (IOException e) {
            LOG.warn("IOException while closing the connection", e);
        }
    }

    private void batch(TableName tableName, Collection<List<Row>> collection, int i) throws IOException {
        if (collection.isEmpty()) {
            return;
        }
        AsyncTable<AdvancedScanResultConsumer> table = getConnection().getTable(tableName);
        ArrayList arrayList = new ArrayList();
        for (List<Row> list : collection) {
            Stream stream = (list.size() > i ? Lists.partition(list, i) : Collections.singletonList(list)).stream();
            table.getClass();
            arrayList.addAll((Collection) stream.map(table::batchAll).collect(Collectors.toList()));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            try {
                FutureUtils.get((Future) it.next());
            } catch (RetriesExhaustedException e) {
                if (!(e.getCause() instanceof TableNotFoundException)) {
                    throw e;
                }
                throw new TableNotFoundException("'" + tableName + "'");
            }
        }
    }

    private AsyncClusterConnection getConnection() throws IOException {
        AsyncClusterConnection asyncClusterConnection = this.sharedConn;
        if (asyncClusterConnection == null) {
            synchronized (this.sharedConnLock) {
                asyncClusterConnection = this.sharedConn;
                if (asyncClusterConnection == null) {
                    asyncClusterConnection = ClusterConnectionFactory.createAsyncClusterConnection(this.conf, null, UserProvider.instantiate(this.conf).getCurrent());
                    this.sharedConn = asyncClusterConnection;
                }
            }
        }
        return asyncClusterConnection;
    }

    public String getStats() {
        long j = this.totalReplicatedEdits.get();
        return j == 0 ? "" : "Sink: age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() + ", total replicated edits: " + j;
    }

    public MetricsSink getSinkMetrics() {
        return this.metrics;
    }
}
