package org.apache.hadoop.hbase.replication.regionserver;

import java.io.IOException;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.LongAccumulator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hbase.thirdparty.com.google.protobuf.ProtocolStringList;
import org.apache.hbase.thirdparty.org.apache.commons.cli.HelpFormatter;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.class */
public class ReplicationSourceShipper extends Thread {
    private static final Logger LOG = LoggerFactory.getLogger(ReplicationSourceShipper.class);
    private final Configuration conf;
    protected final String walGroupId;
    protected final PriorityBlockingQueue<Path> queue;
    private final ReplicationSource source;
    private Path currentPath;
    private volatile WorkerState state;
    protected ReplicationSourceWALReader entryReader;
    protected final long sleepForRetries;
    protected final int maxRetriesMultiplier;
    private final int getEntriesTimeout;
    private final int shipEditsTimeout;
    private volatile long currentPosition = -1;
    private final int DEFAULT_TIMEOUT = 20000;

    /* loaded from: input_file:org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper$WorkerState.class */
    public enum WorkerState {
        RUNNING,
        STOPPED,
        FINISHED
    }

    public ReplicationSourceShipper(Configuration configuration, String str, PriorityBlockingQueue<Path> priorityBlockingQueue, ReplicationSource replicationSource) {
        this.conf = configuration;
        this.walGroupId = str;
        this.queue = priorityBlockingQueue;
        this.source = replicationSource;
        this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000L);
        this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
        this.getEntriesTimeout = this.conf.getInt("replication.source.getEntries.timeout", 20000);
        this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT, 60000);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public final void run() {
        setWorkerState(WorkerState.RUNNING);
        while (isActive()) {
            if (this.source.isPeerEnabled()) {
                try {
                    WALEntryBatch poll = this.entryReader.poll(this.getEntriesTimeout);
                    if (poll == null) {
                        this.source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), this.walGroupId);
                    } else if (poll == WALEntryBatch.NO_MORE_DATA) {
                        noMoreData();
                    } else {
                        shipEdits(poll);
                    }
                } catch (InterruptedException e) {
                    LOG.trace("Interrupted while waiting for next replication entry batch", e);
                    Thread.currentThread().interrupt();
                }
            } else {
                sleepForRetries("Replication is disabled", 1);
            }
        }
        if (!isFinished()) {
            setWorkerState(WorkerState.STOPPED);
        } else {
            this.source.workerThreads.remove(this.walGroupId);
            postFinish();
        }
    }

    protected void noMoreData() {
    }

    protected void postFinish() {
    }

    private int getBatchEntrySizeExcludeBulkLoad(WALEntryBatch wALEntryBatch) {
        int i = 0;
        for (WAL.Entry entry : wALEntryBatch.getWalEntries()) {
            ReplicationSourceWALReader replicationSourceWALReader = this.entryReader;
            i = (int) (i + ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(entry));
        }
        return i;
    }

    private void shipEdits(WALEntryBatch wALEntryBatch) {
        long nanoTime;
        boolean replicate;
        long nanoTime2;
        List<WAL.Entry> walEntries = wALEntryBatch.getWalEntries();
        int i = 0;
        if (walEntries.isEmpty()) {
            if (updateLogPosition(wALEntryBatch)) {
                this.source.getSourceMetrics().setAgeOfLastShippedOp(EnvironmentEdgeManager.currentTime(), this.walGroupId);
                return;
            }
            return;
        }
        int heapSize = (int) wALEntryBatch.getHeapSize();
        int batchEntrySizeExcludeBulkLoad = getBatchEntrySizeExcludeBulkLoad(wALEntryBatch);
        while (isActive()) {
            try {
                try {
                    this.source.tryThrottle(heapSize);
                    ReplicationEndpoint.ReplicateContext replicateContext = new ReplicationEndpoint.ReplicateContext();
                    replicateContext.setEntries(walEntries).setSize(heapSize);
                    replicateContext.setWalGroupId(this.walGroupId);
                    replicateContext.setTimeout(ReplicationUtils.getAdaptiveTimeout(this.shipEditsTimeout, i));
                    nanoTime = System.nanoTime();
                    replicate = this.source.getReplicationEndpoint().replicate(replicateContext);
                    nanoTime2 = System.nanoTime();
                } catch (InterruptedException e) {
                    LOG.debug("Interrupted while sleeping for throttling control");
                    Thread.currentThread().interrupt();
                }
            } catch (Exception e2) {
                LOG.warn("{} threw unknown exception:", this.source.getReplicationEndpoint().getClass().getName(), e2);
                if (sleepForRetries("ReplicationEndpoint threw exception", i)) {
                    i++;
                }
            }
            if (replicate) {
                i = Math.max(i - 1, 0);
                for (WAL.Entry entry : walEntries) {
                    cleanUpHFileRefs(entry.getEdit());
                    LOG.trace("shipped entry {}: ", entry);
                }
                updateLogPosition(wALEntryBatch);
                this.source.postShipEdits(walEntries, batchEntrySizeExcludeBulkLoad);
                this.source.getSourceMetrics().shipBatch(wALEntryBatch.getNbOperations(), heapSize, wALEntryBatch.getNbHFiles());
                this.source.getSourceMetrics().setAgeOfLastShippedOp(walEntries.get(walEntries.size() - 1).getKey().getWriteTime(), this.walGroupId);
                this.source.getSourceMetrics().updateTableLevelMetrics(wALEntryBatch.getWalEntriesWithSize());
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Replicated {} entries or {} operations in {} ms", new Object[]{Integer.valueOf(walEntries.size()), Integer.valueOf(wALEntryBatch.getNbOperations()), Long.valueOf((nanoTime2 - nanoTime) / 1000000)});
                }
                return;
            }
        }
    }

    private void cleanUpHFileRefs(WALEdit wALEdit) throws IOException {
        String peerId = this.source.getPeerId();
        if (peerId.contains(HelpFormatter.DEFAULT_OPT_PREFIX)) {
            peerId = peerId.split(HelpFormatter.DEFAULT_OPT_PREFIX)[0];
        }
        ArrayList<Cell> cells = wALEdit.getCells();
        int size = cells.size();
        for (int i = 0; i < size; i++) {
            Cell cell = cells.get(i);
            if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
                List<WALProtos.StoreDescriptor> storesList = WALEdit.getBulkLoadDescriptor(cell).getStoresList();
                int size2 = storesList.size();
                for (int i2 = 0; i2 < size2; i2++) {
                    ProtocolStringList storeFileList = storesList.get(i2).getStoreFileList();
                    this.source.getSourceManager().cleanUpHFileRefs(peerId, storeFileList);
                    this.source.getSourceMetrics().decrSizeOfHFileRefsQueue(storeFileList.size());
                }
            }
        }
    }

    private boolean updateLogPosition(WALEntryBatch wALEntryBatch) {
        boolean z = false;
        if (wALEntryBatch.isEndOfFile() || !wALEntryBatch.getLastWalPath().equals(this.currentPath) || wALEntryBatch.getLastWalPosition() != this.currentPosition) {
            this.source.getSourceManager().logPositionAndCleanOldLogs(this.source.getQueueId(), this.source.isRecovered(), wALEntryBatch);
            z = true;
        }
        if (wALEntryBatch.isEndOfFile()) {
            this.currentPath = this.entryReader.getCurrentPath();
            this.currentPosition = 0L;
        } else {
            this.currentPath = wALEntryBatch.getLastWalPath();
            this.currentPosition = wALEntryBatch.getLastWalPosition();
        }
        return z;
    }

    public void startup(Thread.UncaughtExceptionHandler uncaughtExceptionHandler) {
        Threads.setDaemonThreadRunning(this, Thread.currentThread().getName() + ".replicationSource.shipper" + this.walGroupId + "," + this.source.getQueueId(), uncaughtExceptionHandler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Path getCurrentPath() {
        return this.entryReader.getCurrentPath();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getCurrentPosition() {
        return this.currentPosition;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setWALReader(ReplicationSourceWALReader replicationSourceWALReader) {
        this.entryReader = replicationSourceWALReader;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getStartPosition() {
        return 0L;
    }

    private boolean isActive() {
        return this.source.isSourceActive() && this.state == WorkerState.RUNNING && !isInterrupted();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public final void setWorkerState(WorkerState workerState) {
        this.state = workerState;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopWorker() {
        setWorkerState(WorkerState.STOPPED);
    }

    public boolean isFinished() {
        return this.state == WorkerState.FINISHED;
    }

    public boolean sleepForRetries(String str, int i) {
        try {
            LOG.trace("{}, sleeping {} times {}", new Object[]{str, Long.valueOf(this.sleepForRetries), Integer.valueOf(i)});
            Thread.sleep(this.sleepForRetries * i);
        } catch (InterruptedException e) {
            LOG.debug("Interrupted while sleeping between retries");
            Thread.currentThread().interrupt();
        }
        return i < this.maxRetriesMultiplier;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearWALEntryBatch() {
        long currentTimeMillis = System.currentTimeMillis() + this.shipEditsTimeout;
        while (true) {
            if (!isAlive() && !this.entryReader.isAlive()) {
                LongAccumulator longAccumulator = new LongAccumulator((j, j2) -> {
                    return j + j2;
                }, 0L);
                this.entryReader.entryBatchQueue.forEach(wALEntryBatch -> {
                    this.entryReader.entryBatchQueue.remove(wALEntryBatch);
                    wALEntryBatch.getWalEntries().forEach(entry -> {
                        longAccumulator.accumulate(ReplicationSourceWALReader.getEntrySizeExcludeBulkLoad(entry));
                    });
                });
                if (LOG.isTraceEnabled()) {
                    LOG.trace("Decrementing totalBufferUsed by {}B while stopping Replication WAL Readers.", Long.valueOf(longAccumulator.longValue()));
                }
                this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(this.source.getSourceManager().getTotalBufferUsed().addAndGet(-longAccumulator.longValue()));
                return;
            }
            try {
                if (System.currentTimeMillis() >= currentTimeMillis) {
                    LOG.warn("Shipper clearWALEntryBatch method timed out whilst waiting reader/shipper thread to stop. Not cleaning buffer usage. Shipper alive: {}; Reader alive: {}", new Object[]{this.source.getPeerId(), Boolean.valueOf(isAlive()), Boolean.valueOf(this.entryReader.isAlive())});
                    return;
                }
                Thread.sleep(this.sleepForRetries);
            } catch (InterruptedException e) {
                LOG.warn("{} Interrupted while waiting {} to stop on clearWALEntryBatch. Not cleaning buffer usage: {}", new Object[]{this.source.getPeerId(), getName(), e});
                return;
            }
        }
    }
}
