package org.apache.hadoop.hbase.regionserver.wal;

import java.io.IOException;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.FSHLogProvider;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
/* loaded from: input_file:org/apache/hadoop/hbase/regionserver/wal/FSHLog.class */
public class FSHLog extends AbstractFSWAL<WALProvider.Writer> {
    private static final String TOLERABLE_LOW_REPLICATION = "hbase.regionserver.hlog.tolerable.lowreplication";
    private static final String LOW_REPLICATION_ROLL_LIMIT = "hbase.regionserver.hlog.lowreplication.rolllimit";
    private static final int DEFAULT_LOW_REPLICATION_ROLL_LIMIT = 5;
    private static final String SYNCER_COUNT = "hbase.regionserver.hlog.syncer.count";
    private static final int DEFAULT_SYNCER_COUNT = 5;
    private static final String MAX_BATCH_COUNT = "hbase.regionserver.wal.sync.batch.count";
    private static final int DEFAULT_MAX_BATCH_COUNT = 200;
    private static final String FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = "hbase.wal.fshlog.wait.on.shutdown.seconds";
    private static final int DEFAULT_FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS = 5;
    private FSDataOutputStream hdfs_out;
    private final int minTolerableReplication;
    private final AtomicInteger consecutiveLogRolls;
    private final int lowReplicationRollLimit;
    private volatile boolean lowReplicationRollEnabled;
    private final int syncerCount;
    private final int maxSyncRequestCount;
    private int syncRunnerIndex;
    private SyncRunner[] syncRunners;
    private static final Logger LOG = LoggerFactory.getLogger(FSHLog.class);
    private static final IOException WITER_REPLACED_EXCEPTION = new IOException("Writer was replaced!");
    private static final IOException WITER_BROKEN_EXCEPTION = new IOException("Wirter was broken!");
    private static final IOException WAL_CLOSE_EXCEPTION = new IOException("WAL was closed!");
    public static final long FIXED_OVERHEAD = ClassSize.align((((ClassSize.OBJECT + (5 * ClassSize.REFERENCE)) + (2 * ClassSize.ATOMIC_INTEGER)) + 12) + 32);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/wal/FSHLog$SyncRequest.class */
    public static class SyncRequest {
        private final WALProvider.Writer writer;
        private final boolean shouldUseHSync;
        private final long sequenceWhenSync;
        private final CompletableFuture<Long> completableFuture;

        public SyncRequest(WALProvider.Writer writer, boolean z, long j, CompletableFuture<Long> completableFuture) {
            this.writer = writer;
            this.shouldUseHSync = z;
            this.sequenceWhenSync = j;
            this.completableFuture = completableFuture;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hbase/regionserver/wal/FSHLog$SyncRunner.class */
    public class SyncRunner extends Thread {
        private final BlockingQueue<SyncRequest> syncRequests;
        private volatile boolean shutDown;

        SyncRunner(String str, int i) {
            super(str);
            this.shutDown = false;
            this.syncRequests = new LinkedBlockingQueue(i * 3);
        }

        boolean offer(SyncRequest syncRequest) {
            if (!this.shutDown && this.syncRequests.offer(syncRequest)) {
                return (this.shutDown && this.syncRequests.remove(syncRequest)) ? false : true;
            }
            return false;
        }

        private void completeSyncRequests(SyncRequest syncRequest, long j) {
            if (syncRequest != null) {
                syncRequest.completableFuture.complete(Long.valueOf(j));
            }
            while (true) {
                SyncRequest peek = this.syncRequests.peek();
                if (peek == null || peek.sequenceWhenSync > j) {
                    return;
                }
                peek.completableFuture.complete(Long.valueOf(j));
                this.syncRequests.poll();
            }
        }

        private void completeExceptionallySyncRequests(SyncRequest syncRequest, Exception exc) {
            if (syncRequest != null) {
                syncRequest.completableFuture.completeExceptionally(exc);
            }
            while (true) {
                SyncRequest peek = this.syncRequests.peek();
                if (peek == null || peek.writer != syncRequest.writer) {
                    return;
                }
                peek.completableFuture.completeExceptionally(exc);
                this.syncRequests.poll();
            }
        }

        private SyncRequest takeSyncRequest() throws InterruptedException {
            while (true) {
                SyncRequest take = this.syncRequests.take();
                long j = FSHLog.this.highestSyncedTxid.get();
                if (take.sequenceWhenSync >= j) {
                    return take;
                }
                take.completableFuture.complete(Long.valueOf(j));
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (!this.shutDown) {
                try {
                    SyncRequest takeSyncRequest = takeSyncRequest();
                    long j = takeSyncRequest.sequenceWhenSync;
                    boolean isWriterBroken = FSHLog.this.isWriterBroken();
                    long j2 = FSHLog.this.highestProcessedAppendTxid;
                    if (((WALProvider.Writer) FSHLog.this.writer) != takeSyncRequest.writer) {
                        takeSyncRequest.completableFuture.completeExceptionally(FSHLog.WITER_REPLACED_EXCEPTION);
                    } else if (isWriterBroken) {
                        takeSyncRequest.completableFuture.completeExceptionally(FSHLog.WITER_BROKEN_EXCEPTION);
                    } else {
                        if (j2 > j) {
                            j = j2;
                        }
                        try {
                            try {
                                ((WALProvider.Writer) FSHLog.this.writer).sync(takeSyncRequest.shouldUseHSync);
                                if (0 != 0) {
                                    completeExceptionallySyncRequests(takeSyncRequest, null);
                                } else {
                                    completeSyncRequests(takeSyncRequest, j);
                                }
                            } catch (Throwable th) {
                                if (0 != 0) {
                                    completeExceptionallySyncRequests(takeSyncRequest, null);
                                } else {
                                    completeSyncRequests(takeSyncRequest, j);
                                }
                                throw th;
                                break;
                            }
                        } catch (IOException e) {
                            FSHLog.LOG.error("Error syncing", e);
                            if (e != null) {
                                completeExceptionallySyncRequests(takeSyncRequest, e);
                            } else {
                                completeSyncRequests(takeSyncRequest, j);
                            }
                        } catch (Exception e2) {
                            FSHLog.LOG.warn("UNEXPECTED", e2);
                            if (e2 != null) {
                                completeExceptionallySyncRequests(takeSyncRequest, e2);
                            } else {
                                completeSyncRequests(takeSyncRequest, j);
                            }
                        }
                    }
                } catch (InterruptedException e3) {
                    FSHLog.LOG.info("interrupted");
                } catch (Throwable th2) {
                    FSHLog.LOG.warn("UNEXPECTED, continuing", th2);
                }
            }
            clearSyncRequestsWhenShutDown();
        }

        private void clearSyncRequestsWhenShutDown() {
            while (true) {
                SyncRequest poll = this.syncRequests.poll();
                if (poll == null) {
                    return;
                } else {
                    poll.completableFuture.completeExceptionally(FSHLog.WAL_CLOSE_EXCEPTION);
                }
            }
        }

        void shutDown() {
            try {
                this.shutDown = true;
                interrupt();
                join();
            } catch (InterruptedException e) {
                FSHLog.LOG.warn("interrupted", e);
                Thread.currentThread().interrupt();
            }
        }
    }

    public FSHLog(FileSystem fileSystem, Path path, String str, Configuration configuration) throws IOException {
        this(fileSystem, path, str, HConstants.HREGION_OLDLOGDIR_NAME, configuration, null, true, null, null);
    }

    public FSHLog(FileSystem fileSystem, Abortable abortable, Path path, String str, Configuration configuration) throws IOException {
        this(fileSystem, abortable, path, str, HConstants.HREGION_OLDLOGDIR_NAME, configuration, null, true, null, null, null, null);
    }

    public FSHLog(FileSystem fileSystem, Path path, String str, String str2, Configuration configuration, List<WALActionsListener> list, boolean z, String str3, String str4) throws IOException {
        this(fileSystem, null, path, str, str2, configuration, list, z, str3, str4, null, null);
    }

    public FSHLog(FileSystem fileSystem, Abortable abortable, Path path, String str, String str2, Configuration configuration, List<WALActionsListener> list, boolean z, String str3, String str4, FileSystem fileSystem2, Path path2) throws IOException {
        super(fileSystem, abortable, path, str, str2, configuration, list, z, str3, str4, fileSystem2, path2);
        this.consecutiveLogRolls = new AtomicInteger(0);
        this.lowReplicationRollEnabled = true;
        this.syncRunnerIndex = 0;
        this.syncRunners = null;
        this.minTolerableReplication = configuration.getInt(TOLERABLE_LOW_REPLICATION, CommonFSUtils.getDefaultReplication(fileSystem, this.walDir));
        this.lowReplicationRollLimit = configuration.getInt(LOW_REPLICATION_ROLL_LIMIT, 5);
        this.syncerCount = configuration.getInt(SYNCER_COUNT, 5);
        this.maxSyncRequestCount = configuration.getInt(MAX_BATCH_COUNT, configuration.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200));
        createSingleThreadPoolConsumeExecutor("FSHLog", path, str3);
        setWaitOnShutdownInSeconds(configuration.getInt(FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS, 5), FSHLOG_WAIT_ON_SHUTDOWN_IN_SECONDS);
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL, org.apache.hadoop.hbase.wal.WAL
    public void init() throws IOException {
        super.init();
        createSyncRunnersAndStart();
    }

    private void createSyncRunnersAndStart() {
        this.syncRunnerIndex = 0;
        this.syncRunners = new SyncRunner[this.syncerCount];
        for (int i = 0; i < this.syncerCount; i++) {
            this.syncRunners[i] = new SyncRunner("sync." + i, this.maxSyncRequestCount);
            this.syncRunners[i].start();
        }
    }

    OutputStream getOutputStream() {
        FSDataOutputStream fSDataOutputStream = this.hdfs_out;
        if (fSDataOutputStream != null) {
            return fSDataOutputStream.getWrappedStream();
        }
        return null;
    }

    private void preemptiveSync(ProtobufLogWriter protobufLogWriter) {
        long nanoTime = System.nanoTime();
        try {
            protobufLogWriter.sync(this.useHsync);
            postSync(System.nanoTime() - nanoTime, 0);
        } catch (IOException e) {
            LOG.warn("pre-sync failed but an optimization so keep going", e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    public WALProvider.Writer createWriterInstance(FileSystem fileSystem, Path path) throws IOException {
        FSHLogProvider.Writer createWriter = FSHLogProvider.createWriter(this.conf, fileSystem, path, false, this.blocksize);
        if (createWriter instanceof ProtobufLogWriter) {
            preemptiveSync((ProtobufLogWriter) createWriter);
        }
        return createWriter;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    public void doAppend(WALProvider.Writer writer, FSWALEntry fSWALEntry) throws IOException {
        writer.append(fSWALEntry);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    public void onWriterReplaced(WALProvider.Writer writer) {
        if (writer == null || !(writer instanceof ProtobufLogWriter)) {
            this.hdfs_out = null;
        } else {
            this.hdfs_out = ((ProtobufLogWriter) writer).getStream();
        }
        createSyncRunnersAndStart();
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    protected void doCleanUpResources() {
        shutDownSyncRunners();
    }

    private void shutDownSyncRunners() {
        SyncRunner[] syncRunnerArr = this.syncRunners;
        if (syncRunnerArr != null) {
            for (SyncRunner syncRunner : syncRunnerArr) {
                syncRunner.shutDown();
            }
        }
        this.syncRunners = null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    public CompletableFuture<Long> doWriterSync(WALProvider.Writer writer, boolean z, long j) {
        CompletableFuture<Long> completableFuture = new CompletableFuture<>();
        offerSyncRequest(new SyncRequest(writer, z, j, completableFuture));
        return completableFuture;
    }

    private void offerSyncRequest(SyncRequest syncRequest) {
        for (int i = 0; i < this.syncRunners.length; i++) {
            this.syncRunnerIndex = (this.syncRunnerIndex + 1) % this.syncRunners.length;
            if (this.syncRunners[this.syncRunnerIndex].offer(syncRequest)) {
                return;
            }
        }
        syncRequest.completableFuture.completeExceptionally(new IOException("There is no available syncRunner."));
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    protected void checkSlowSyncCount() {
        if (!isLogRollRequested() && doCheckSlowSync()) {
            requestLogRoll(WALActionsListener.RollRequestReason.SLOW_SYNC);
        }
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    protected boolean doCheckLogLowReplication() {
        boolean z = false;
        try {
            int logReplication = getLogReplication();
            if (logReplication == 0 || logReplication >= this.minTolerableReplication) {
                if (logReplication >= this.minTolerableReplication && !this.lowReplicationRollEnabled) {
                    if (this.numEntries.get() <= 1) {
                        return false;
                    }
                    this.lowReplicationRollEnabled = true;
                    LOG.info("LowReplication-Roller was enabled.");
                }
            } else if (this.lowReplicationRollEnabled) {
                if (this.consecutiveLogRolls.get() < this.lowReplicationRollLimit) {
                    LOG.warn("HDFS pipeline error detected. Found " + logReplication + " replicas but expecting no less than " + this.minTolerableReplication + " replicas.  Requesting close of WAL. current pipeline: " + Arrays.toString(getPipeline()));
                    z = true;
                    this.consecutiveLogRolls.getAndIncrement();
                } else {
                    LOG.warn("Too many consecutive RollWriter requests, it's a sign of the total number of live datanodes is lower than the tolerable replicas.");
                    this.consecutiveLogRolls.set(0);
                    this.lowReplicationRollEnabled = false;
                }
            }
        } catch (Exception e) {
            LOG.warn("DFSOutputStream.getNumCurrentReplicas failed because of " + e + ", continuing...");
        }
        return z;
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    int getLogReplication() {
        try {
            if (this.hdfs_out instanceof HdfsDataOutputStream) {
                return this.hdfs_out.getCurrentBlockReplication();
            }
            return 0;
        } catch (IOException e) {
            LOG.info("", e);
            return 0;
        }
    }

    boolean isLowReplicationRollEnabled() {
        return this.lowReplicationRollEnabled;
    }

    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    DatanodeInfo[] getPipeline() {
        return (this.hdfs_out == null || !(this.hdfs_out.getWrappedStream() instanceof DFSOutputStream)) ? new DatanodeInfo[0] : this.hdfs_out.getWrappedStream().getPipeline();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL
    public WALProvider.Writer createCombinedWriter(WALProvider.Writer writer, WALProvider.Writer writer2) {
        return CombinedWriter.create(writer2, writer);
    }
}
