/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.replication.regionserver;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.OptionalLong;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Evolving
class WALEntryStream
implements Closeable {
    private static final Logger LOG = LoggerFactory.getLogger(WALEntryStream.class);
    private WAL.Reader reader;
    private Path currentPath;
    private WAL.Entry currentEntry;
    private long currentPositionOfEntry = 0L;
    private long currentPositionOfReader = 0L;
    private final PriorityBlockingQueue<Path> logQueue;
    private final FileSystem fs;
    private final Configuration conf;
    private final WALFileLengthProvider walFileLengthProvider;
    private final ServerName serverName;
    private final MetricsSource metrics;

    public WALEntryStream(PriorityBlockingQueue<Path> logQueue, FileSystem fs, Configuration conf, long startPosition, WALFileLengthProvider walFileLengthProvider, ServerName serverName, MetricsSource metrics) throws IOException {
        this.logQueue = logQueue;
        this.fs = fs;
        this.conf = conf;
        this.currentPositionOfEntry = startPosition;
        this.walFileLengthProvider = walFileLengthProvider;
        this.serverName = serverName;
        this.metrics = metrics;
    }

    public boolean hasNext() throws IOException {
        if (this.currentEntry == null) {
            this.tryAdvanceEntry();
        }
        return this.currentEntry != null;
    }

    public WAL.Entry peek() throws IOException {
        return this.hasNext() ? this.currentEntry : null;
    }

    public WAL.Entry next() throws IOException {
        WAL.Entry save = this.peek();
        this.currentPositionOfEntry = this.currentPositionOfReader;
        this.currentEntry = null;
        return save;
    }

    @Override
    public void close() throws IOException {
        this.closeReader();
    }

    public long getPosition() {
        return this.currentPositionOfEntry;
    }

    public Path getCurrentPath() {
        return this.currentPath;
    }

    private String getCurrentPathStat() {
        StringBuilder sb = new StringBuilder();
        if (this.currentPath != null) {
            sb.append("currently replicating from: ").append(this.currentPath).append(" at position: ").append(this.currentPositionOfEntry).append("\n");
        } else {
            sb.append("no replication ongoing, waiting for new log");
        }
        return sb.toString();
    }

    public void reset() throws IOException {
        if (this.reader != null && this.currentPath != null) {
            this.resetReader();
        }
    }

    private void setPosition(long position) {
        this.currentPositionOfEntry = position;
    }

    private void setCurrentPath(Path path) {
        this.currentPath = path;
    }

    private void tryAdvanceEntry() throws IOException {
        if (this.checkReader()) {
            boolean beingWritten = this.readNextEntryAndRecordReaderPosition();
            if (this.currentEntry == null && !beingWritten) {
                this.resetReader();
                this.readNextEntryAndRecordReaderPosition();
                if (this.currentEntry == null && this.checkAllBytesParsed()) {
                    this.dequeueCurrentLog();
                    if (this.openNextLog()) {
                        this.readNextEntryAndRecordReaderPosition();
                    }
                }
            }
        }
    }

    private boolean checkAllBytesParsed() throws IOException {
        long trailerSize = this.currentTrailerSize();
        FileStatus stat = null;
        try {
            stat = this.fs.getFileStatus(this.currentPath);
        }
        catch (IOException exception) {
            LOG.warn("Couldn't get file length information about log {}, it {} closed cleanly {}", new Object[]{this.currentPath, trailerSize < 0L ? "was not" : "was", this.getCurrentPathStat()});
            this.metrics.incrUnknownFileLengthForClosedWAL();
        }
        if (stat != null) {
            if (trailerSize < 0L) {
                if (this.currentPositionOfReader < stat.getLen()) {
                    long skippedBytes = stat.getLen() - this.currentPositionOfReader;
                    LOG.debug("Reached the end of WAL file '{}'. It was not closed cleanly, so we did not parse {} bytes of data. This is normally ok.", (Object)this.currentPath, (Object)skippedBytes);
                    this.metrics.incrUncleanlyClosedWALs();
                    this.metrics.incrBytesSkippedInUncleanlyClosedWALs(skippedBytes);
                }
            } else if (this.currentPositionOfReader + trailerSize < stat.getLen()) {
                LOG.warn("Processing end of WAL file '{}'. At position {}, which is too far away from reported file length {}. Restarting WAL reading (see HBASE-15983 for details). {}", new Object[]{this.currentPath, this.currentPositionOfReader, stat.getLen(), this.getCurrentPathStat()});
                this.setPosition(0L);
                this.resetReader();
                this.metrics.incrRestartedWALReading();
                this.metrics.incrRepeatedFileBytes(this.currentPositionOfReader);
                return false;
            }
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Reached the end of log " + this.currentPath + ", and the length of the file is " + (stat == null ? "N/A" : Long.valueOf(stat.getLen())));
        }
        this.metrics.incrCompletedWAL();
        return true;
    }

    private void dequeueCurrentLog() throws IOException {
        LOG.debug("Reached the end of log {}", (Object)this.currentPath);
        this.closeReader();
        this.logQueue.remove();
        this.setPosition(0L);
        this.metrics.decrSizeOfLogQueue();
    }

    private boolean readNextEntryAndRecordReaderPosition() throws IOException {
        WAL.Entry readEntry = this.reader.next();
        long readerPos = this.reader.getPosition();
        OptionalLong fileLength = this.walFileLengthProvider.getLogFileSizeIfBeingWritten(this.currentPath);
        if (fileLength.isPresent() && readerPos > fileLength.getAsLong()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("The provider tells us the valid length for " + this.currentPath + " is " + fileLength.getAsLong() + ", but we have advanced to " + readerPos);
            }
            this.resetReader();
            return true;
        }
        if (readEntry != null) {
            this.metrics.incrLogEditsRead();
            this.metrics.incrLogReadInBytes(readerPos - this.currentPositionOfEntry);
        }
        this.currentEntry = readEntry;
        this.currentPositionOfReader = readerPos;
        return fileLength.isPresent();
    }

    private void closeReader() throws IOException {
        if (this.reader != null) {
            this.reader.close();
            this.reader = null;
        }
    }

    private boolean checkReader() throws IOException {
        if (this.reader == null) {
            return this.openNextLog();
        }
        return true;
    }

    private boolean openNextLog() throws IOException {
        Path nextPath = this.logQueue.peek();
        if (nextPath != null) {
            this.openReader(nextPath);
            if (this.reader != null) {
                return true;
            }
        } else {
            this.setCurrentPath(null);
        }
        return false;
    }

    private Path getArchivedLog(Path path) throws IOException {
        Path rootDir = FSUtils.getRootDir(this.conf);
        Path oldLogDir = new Path(rootDir, "oldWALs");
        Path archivedLogLocation = new Path(oldLogDir, path.getName());
        if (this.fs.exists(archivedLogLocation)) {
            LOG.info("Log " + path + " was moved to " + archivedLogLocation);
            return archivedLogLocation;
        }
        oldLogDir = new Path(rootDir, "oldWALs" + "/" + this.serverName.getServerName());
        archivedLogLocation = new Path(oldLogDir, path.getName());
        if (this.fs.exists(archivedLogLocation)) {
            LOG.info("Log " + path + " was moved to " + archivedLogLocation);
            return archivedLogLocation;
        }
        LOG.error("Couldn't locate log: " + path);
        return path;
    }

    private void handleFileNotFound(Path path, FileNotFoundException fnfe) throws IOException {
        Path archivedLog = this.getArchivedLog(path);
        if (path.equals((Object)archivedLog)) {
            throw fnfe;
        }
        this.openReader(archivedLog);
    }

    private void openReader(Path path) throws IOException {
        try {
            if (this.reader == null || !this.getCurrentPath().equals((Object)path)) {
                this.closeReader();
                this.reader = WALFactory.createReader(this.fs, path, this.conf);
                this.seek();
                this.setCurrentPath(path);
            } else {
                this.resetReader();
            }
        }
        catch (FileNotFoundException fnfe) {
            this.handleFileNotFound(path, fnfe);
        }
        catch (RemoteException re) {
            IOException ioe = re.unwrapRemoteException(new Class[]{FileNotFoundException.class});
            if (!(ioe instanceof FileNotFoundException)) {
                throw ioe;
            }
            this.handleFileNotFound(path, (FileNotFoundException)ioe);
        }
        catch (LeaseNotRecoveredException lnre) {
            LOG.warn("Try to recover the WAL lease " + this.currentPath, (Throwable)lnre);
            this.recoverLease(this.conf, this.currentPath);
            this.reader = null;
        }
        catch (NullPointerException npe) {
            LOG.warn("Got NPE opening reader, will retry.");
            this.reader = null;
        }
    }

    private void recoverLease(Configuration conf, final Path path) {
        try {
            FileSystem dfs = FSUtils.getCurrentFileSystem(conf);
            FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
            fsUtils.recoverFileLease(dfs, path, conf, new CancelableProgressable(){

                @Override
                public boolean progress() {
                    LOG.debug("recover WAL lease: " + path);
                    return true;
                }
            });
        }
        catch (IOException e) {
            LOG.warn("unable to recover lease for WAL: " + path, (Throwable)e);
        }
    }

    private void resetReader() throws IOException {
        try {
            this.currentEntry = null;
            this.reader.reset();
            this.seek();
        }
        catch (FileNotFoundException fnfe) {
            Path archivedLog = this.getArchivedLog(this.currentPath);
            if (!this.currentPath.equals((Object)archivedLog)) {
                this.openReader(archivedLog);
            }
            throw fnfe;
        }
        catch (NullPointerException npe) {
            throw new IOException("NPE resetting reader, likely HDFS-4380", npe);
        }
    }

    private void seek() throws IOException {
        if (this.currentPositionOfEntry != 0L) {
            this.reader.seek(this.currentPositionOfEntry);
        }
    }

    private long currentTrailerSize() {
        long size = -1L;
        if (this.reader instanceof ProtobufLogReader) {
            ProtobufLogReader pblr = (ProtobufLogReader)this.reader;
            size = pblr.trailerSize();
        }
        return size;
    }
}

