/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.procedure2.store.wal;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStore;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreTracker;
import org.apache.hadoop.hbase.procedure2.store.wal.ProcedureWALFile;
import org.apache.hadoop.hbase.procedure2.store.wal.ProcedureWALFormat;
import org.apache.hadoop.hbase.procedure2.util.ByteSlot;
import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.protobuf.generated.ProcedureProtos;
import org.apache.hadoop.hbase.shaded.org.apache.commons.collections.buffer.CircularFifoBuffer;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class WALProcedureStore
extends ProcedureStoreBase {
    private static final Log LOG = LogFactory.getLog(WALProcedureStore.class);
    public static final String MASTER_PROCEDURE_LOGDIR = "MasterProcWALs";
    public static final String MAX_RETRIES_BEFORE_ROLL_CONF_KEY = "hbase.procedure.store.wal.max.retries.before.roll";
    private static final int DEFAULT_MAX_RETRIES_BEFORE_ROLL = 3;
    public static final String WAIT_BEFORE_ROLL_CONF_KEY = "hbase.procedure.store.wal.wait.before.roll";
    private static final int DEFAULT_WAIT_BEFORE_ROLL = 500;
    public static final String ROLL_RETRIES_CONF_KEY = "hbase.procedure.store.wal.max.roll.retries";
    private static final int DEFAULT_ROLL_RETRIES = 3;
    public static final String MAX_SYNC_FAILURE_ROLL_CONF_KEY = "hbase.procedure.store.wal.sync.failure.roll.max";
    private static final int DEFAULT_MAX_SYNC_FAILURE_ROLL = 3;
    public static final String PERIODIC_ROLL_CONF_KEY = "hbase.procedure.store.wal.periodic.roll.msec";
    private static final int DEFAULT_PERIODIC_ROLL = 3600000;
    public static final String SYNC_WAIT_MSEC_CONF_KEY = "hbase.procedure.store.wal.sync.wait.msec";
    private static final int DEFAULT_SYNC_WAIT_MSEC = 100;
    public static final String USE_HSYNC_CONF_KEY = "hbase.procedure.store.wal.use.hsync";
    private static final boolean DEFAULT_USE_HSYNC = true;
    public static final String ROLL_THRESHOLD_CONF_KEY = "hbase.procedure.store.wal.roll.threshold";
    private static final long DEFAULT_ROLL_THRESHOLD = 0x2000000L;
    public static final String STORE_WAL_SYNC_STATS_COUNT = "hbase.procedure.store.wal.sync.stats.count";
    private static final int DEFAULT_SYNC_STATS_COUNT = 10;
    private final LinkedList<ProcedureWALFile> logs = new LinkedList();
    private final ProcedureStoreTracker storeTracker = new ProcedureStoreTracker();
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition waitCond = this.lock.newCondition();
    private final Condition slotCond = this.lock.newCondition();
    private final Condition syncCond = this.lock.newCondition();
    private final LeaseRecovery leaseRecovery;
    private final Configuration conf;
    private final FileSystem fs;
    private final Path walDir;
    private final boolean enforceStreamCapability;
    private final AtomicReference<Throwable> syncException = new AtomicReference();
    private final AtomicBoolean loading = new AtomicBoolean(true);
    private final AtomicBoolean inSync = new AtomicBoolean(false);
    private final AtomicLong totalSynced = new AtomicLong(0L);
    private final AtomicLong lastRollTs = new AtomicLong(0L);
    private LinkedTransferQueue<ByteSlot> slotsCache = null;
    private Set<ProcedureWALFile> corruptedLogs = null;
    private FSDataOutputStream stream = null;
    private long flushLogId = 0L;
    private int slotIndex = 0;
    private Thread syncThread;
    private ByteSlot[] slots;
    private int maxRetriesBeforeRoll;
    private int maxSyncFailureRoll;
    private int waitBeforeRoll;
    private int rollRetries;
    private int periodicRollMsec;
    private long rollThreshold;
    private boolean useHsync;
    private int syncWaitMsec;
    private CircularFifoBuffer syncMetricsBuffer;
    private static final PathFilter WALS_PATH_FILTER = new PathFilter(){

        @Override
        public boolean accept(Path path) {
            String name = path.getName();
            return name.startsWith("state-") && name.endsWith(".log");
        }
    };
    private static final Comparator<FileStatus> FILE_STATUS_ID_COMPARATOR = new Comparator<FileStatus>(){

        @Override
        public int compare(FileStatus a, FileStatus b) {
            long aId = WALProcedureStore.getLogIdFromName(a.getPath().getName());
            long bId = WALProcedureStore.getLogIdFromName(b.getPath().getName());
            return Long.compare(aId, bId);
        }
    };

    public WALProcedureStore(Configuration conf, Path walDir, LeaseRecovery leaseRecovery) throws IOException {
        this.conf = conf;
        this.walDir = walDir;
        this.leaseRecovery = leaseRecovery;
        this.fs = walDir.getFileSystem(conf);
        this.enforceStreamCapability = conf.getBoolean("hbase.unsafe.stream.capability.enforce", true);
        if (!this.fs.exists(walDir) && !this.fs.mkdirs(walDir)) {
            throw new IOException("Unable to mkdir " + walDir);
        }
        CommonFSUtils.setStoragePolicy(this.fs, conf, walDir, "hbase.wal.storage.policy", "NONE");
    }

    @Override
    public void start(int numSlots) throws IOException {
        if (!this.setRunning(true)) {
            return;
        }
        this.loading.set(true);
        this.slots = new ByteSlot[numSlots];
        this.slotsCache = new LinkedTransferQueue();
        while (this.slotsCache.size() < numSlots) {
            this.slotsCache.offer(new ByteSlot());
        }
        this.maxRetriesBeforeRoll = this.conf.getInt(MAX_RETRIES_BEFORE_ROLL_CONF_KEY, 3);
        this.maxSyncFailureRoll = this.conf.getInt(MAX_SYNC_FAILURE_ROLL_CONF_KEY, 3);
        this.waitBeforeRoll = this.conf.getInt(WAIT_BEFORE_ROLL_CONF_KEY, 500);
        this.rollRetries = this.conf.getInt(ROLL_RETRIES_CONF_KEY, 3);
        this.rollThreshold = this.conf.getLong(ROLL_THRESHOLD_CONF_KEY, 0x2000000L);
        this.periodicRollMsec = this.conf.getInt(PERIODIC_ROLL_CONF_KEY, 3600000);
        this.syncWaitMsec = this.conf.getInt(SYNC_WAIT_MSEC_CONF_KEY, 100);
        this.useHsync = this.conf.getBoolean(USE_HSYNC_CONF_KEY, true);
        this.syncMetricsBuffer = new CircularFifoBuffer(this.conf.getInt(STORE_WAL_SYNC_STATS_COUNT, 10));
        this.syncThread = new Thread("WALProcedureStoreSyncThread"){

            @Override
            public void run() {
                block2: {
                    try {
                        WALProcedureStore.this.syncLoop();
                    }
                    catch (Throwable e) {
                        LOG.error((Object)"Got an exception from the sync-loop", e);
                        if (WALProcedureStore.this.isSyncAborted()) break block2;
                        WALProcedureStore.this.sendAbortProcessSignal();
                    }
                }
            }
        };
        this.syncThread.start();
    }

    @Override
    public void stop(boolean abort) {
        if (!this.setRunning(false)) {
            return;
        }
        LOG.info((Object)"Stopping the WAL Procedure Store");
        this.sendStopSignal();
        if (!abort) {
            try {
                while (this.syncThread.isAlive()) {
                    this.sendStopSignal();
                    this.syncThread.join(250L);
                }
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"join interrupted", (Throwable)e);
                Thread.currentThread().interrupt();
            }
        }
        this.closeStream();
        for (ProcedureWALFile log : this.logs) {
            log.close();
        }
        this.logs.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void sendStopSignal() {
        if (this.lock.tryLock()) {
            try {
                this.waitCond.signalAll();
                this.syncCond.signalAll();
            }
            finally {
                this.lock.unlock();
            }
        }
    }

    @Override
    public int getNumThreads() {
        return this.slots == null ? 0 : this.slots.length;
    }

    public ProcedureStoreTracker getStoreTracker() {
        return this.storeTracker;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ArrayList<ProcedureWALFile> getActiveLogs() {
        this.lock.lock();
        try {
            ArrayList<ProcedureWALFile> arrayList = new ArrayList<ProcedureWALFile>(this.logs);
            return arrayList;
        }
        finally {
            this.lock.unlock();
        }
    }

    public Set<ProcedureWALFile> getCorruptedLogs() {
        return this.corruptedLogs;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void recoverLease() throws IOException {
        this.lock.lock();
        try {
            LOG.info((Object)"Starting WAL Procedure Store lease recovery");
            FileStatus[] oldLogs = this.getLogFiles();
            while (this.isRunning()) {
                try {
                    this.flushLogId = this.initOldLogs(oldLogs);
                }
                catch (FileNotFoundException e) {
                    LOG.warn((Object)"someone else is active and deleted logs. retrying.", (Throwable)e);
                    oldLogs = this.getLogFiles();
                    continue;
                }
                if (!this.rollWriter(this.flushLogId + 1L)) {
                    LOG.debug((Object)("someone else has already created log " + this.flushLogId));
                    continue;
                }
                oldLogs = this.getLogFiles();
                if (WALProcedureStore.getMaxLogId(oldLogs) > this.flushLogId) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug((Object)("Someone else created new logs. Expected maxLogId < " + this.flushLogId));
                    }
                    this.logs.getLast().removeFile();
                    continue;
                }
                LOG.info((Object)("Lease acquired for flushLogId: " + this.flushLogId));
                break;
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void load(final ProcedureStore.ProcedureLoader loader) throws IOException {
        if (this.logs.isEmpty()) {
            throw new RuntimeException("recoverLease() must be called before loading data");
        }
        if (this.logs.size() == 1) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)"No state logs to replay.");
            }
            loader.setMaxProcId(0L);
            this.loading.set(false);
            return;
        }
        Iterator<ProcedureWALFile> it = this.logs.descendingIterator();
        it.next();
        try {
            ProcedureWALFormat.load(it, this.storeTracker, new ProcedureWALFormat.Loader(){

                @Override
                public void setMaxProcId(long maxProcId) {
                    loader.setMaxProcId(maxProcId);
                }

                @Override
                public void load(ProcedureStore.ProcedureIterator procIter) throws IOException {
                    loader.load(procIter);
                }

                @Override
                public void handleCorrupted(ProcedureStore.ProcedureIterator procIter) throws IOException {
                    loader.handleCorrupted(procIter);
                }

                @Override
                public void markCorruptedWAL(ProcedureWALFile log, IOException e) {
                    if (WALProcedureStore.this.corruptedLogs == null) {
                        WALProcedureStore.this.corruptedLogs = new HashSet();
                    }
                    WALProcedureStore.this.corruptedLogs.add(log);
                }
            });
        }
        finally {
            this.loading.set(false);
        }
    }

    @Override
    public void insert(Procedure proc, Procedure[] subprocs) {
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("Insert " + proc + ", subproc=" + Arrays.toString(subprocs)));
        }
        ByteSlot slot = this.acquireSlot();
        try {
            long[] subProcIds = null;
            if (subprocs != null) {
                ProcedureWALFormat.writeInsert(slot, proc, subprocs);
                subProcIds = new long[subprocs.length];
                for (int i = 0; i < subprocs.length; ++i) {
                    subProcIds[i] = subprocs[i].getProcId();
                }
            } else {
                assert (!proc.hasParent());
                ProcedureWALFormat.writeInsert(slot, proc);
            }
            this.pushData(PushType.INSERT, slot, proc.getProcId(), subProcIds);
        }
        catch (IOException e) {
            LOG.fatal((Object)("Unable to serialize one of the procedure: proc=" + proc + ", subprocs=" + Arrays.toString(subprocs)), (Throwable)e);
            throw new RuntimeException(e);
        }
        finally {
            this.releaseSlot(slot);
        }
    }

    @Override
    public void update(Procedure proc) {
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("Update " + proc));
        }
        ByteSlot slot = this.acquireSlot();
        try {
            ProcedureWALFormat.writeUpdate(slot, proc);
            this.pushData(PushType.UPDATE, slot, proc.getProcId(), null);
        }
        catch (IOException e) {
            LOG.fatal((Object)("Unable to serialize the procedure: " + proc), (Throwable)e);
            throw new RuntimeException(e);
        }
        finally {
            this.releaseSlot(slot);
        }
    }

    @Override
    public void delete(long procId) {
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("Delete " + procId));
        }
        ByteSlot slot = this.acquireSlot();
        try {
            ProcedureWALFormat.writeDelete(slot, procId);
            this.pushData(PushType.DELETE, slot, procId, null);
        }
        catch (IOException e) {
            LOG.fatal((Object)("Unable to serialize the procedure: " + procId), (Throwable)e);
            throw new RuntimeException(e);
        }
        finally {
            this.releaseSlot(slot);
        }
    }

    @Override
    public void delete(Procedure proc, long[] subProcIds) {
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("Update " + proc + " and Delete " + Arrays.toString(subProcIds)));
        }
        ByteSlot slot = this.acquireSlot();
        try {
            ProcedureWALFormat.writeDelete(slot, proc, subProcIds);
            this.pushData(PushType.DELETE, slot, proc.getProcId(), subProcIds);
        }
        catch (IOException e) {
            LOG.fatal((Object)("Unable to serialize the procedure: " + proc), (Throwable)e);
            throw new RuntimeException(e);
        }
        finally {
            this.releaseSlot(slot);
        }
    }

    private ByteSlot acquireSlot() {
        ByteSlot slot = this.slotsCache.poll();
        return slot != null ? slot : new ByteSlot();
    }

    private void releaseSlot(ByteSlot slot) {
        slot.reset();
        this.slotsCache.offer(slot);
    }

    private long pushData(PushType type, ByteSlot slot, long procId, long[] subProcIds) {
        if (!this.isRunning()) {
            throw new RuntimeException("the store must be running before inserting data");
        }
        if (this.logs.isEmpty()) {
            throw new RuntimeException("recoverLease() must be called before inserting data");
        }
        long logId = -1L;
        this.lock.lock();
        try {
            while (true) {
                if (!this.isRunning()) {
                    throw new RuntimeException("store no longer running");
                }
                if (this.isSyncAborted()) {
                    throw new RuntimeException("sync aborted", this.syncException.get());
                }
                if (this.inSync.get()) {
                    this.syncCond.await();
                    continue;
                }
                if (this.slotIndex != this.slots.length) break;
                this.slotCond.signal();
                this.syncCond.await();
            }
            this.updateStoreTracker(type, procId, subProcIds);
            this.slots[this.slotIndex++] = slot;
            logId = this.flushLogId;
            if (this.slotIndex == 1) {
                this.waitCond.signal();
            }
            if (this.slotIndex == this.slots.length) {
                this.waitCond.signal();
                this.slotCond.signal();
            }
            this.syncCond.await();
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            this.sendAbortProcessSignal();
            throw new RuntimeException(e);
        }
        finally {
            this.lock.unlock();
            if (this.isSyncAborted()) {
                throw new RuntimeException("sync aborted", this.syncException.get());
            }
        }
        return logId;
    }

    private void updateStoreTracker(PushType type, long procId, long[] subProcIds) {
        switch (type) {
            case INSERT: {
                if (subProcIds == null) {
                    this.storeTracker.insert(procId);
                    break;
                }
                this.storeTracker.insert(procId, subProcIds);
                break;
            }
            case UPDATE: {
                this.storeTracker.update(procId);
                break;
            }
            case DELETE: {
                if (subProcIds != null && subProcIds.length > 0) {
                    this.storeTracker.delete(subProcIds);
                    break;
                }
                this.storeTracker.delete(procId);
                break;
            }
            default: {
                throw new RuntimeException("invalid push type " + (Object)((Object)type));
            }
        }
    }

    private boolean isSyncAborted() {
        return this.syncException.get() != null;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void syncLoop() throws Throwable {
        long totalSyncedToStore = 0L;
        this.inSync.set(false);
        this.lock.lock();
        try {
            while (this.isRunning()) {
                try {
                    if (this.slotIndex == 0) {
                        if (!this.loading.get()) {
                            this.periodicRoll();
                        }
                        if (LOG.isTraceEnabled()) {
                            float rollTsSec = (float)this.getMillisFromLastRoll() / 1000.0f;
                            LOG.trace((Object)String.format("Waiting for data. flushed=%s (%s/sec)", StringUtils.humanSize(this.totalSynced.get()), StringUtils.humanSize((float)this.totalSynced.get() / rollTsSec)));
                        }
                        this.waitCond.await(this.getMillisToNextPeriodicRoll(), TimeUnit.MILLISECONDS);
                        if (this.slotIndex == 0) continue;
                    }
                    long syncWaitSt = System.currentTimeMillis();
                    if (this.slotIndex != this.slots.length) {
                        this.slotCond.await(this.syncWaitMsec, TimeUnit.MILLISECONDS);
                    }
                    long currentTs = System.currentTimeMillis();
                    long syncWaitMs = currentTs - syncWaitSt;
                    float rollSec = (float)this.getMillisFromLastRoll() / 1000.0f;
                    float syncedPerSec = (float)totalSyncedToStore / rollSec;
                    if (LOG.isTraceEnabled() && (syncWaitMs > 10L || this.slotIndex < this.slots.length)) {
                        LOG.trace((Object)String.format("Sync wait %s, slotIndex=%s , totalSynced=%s (%s/sec)", StringUtils.humanTimeDiff(syncWaitMs), this.slotIndex, StringUtils.humanSize(totalSyncedToStore), StringUtils.humanSize(syncedPerSec)));
                    }
                    SyncMetrics syncMetrics = new SyncMetrics();
                    syncMetrics.timestamp = currentTs;
                    syncMetrics.syncWaitMs = syncWaitMs;
                    syncMetrics.syncedEntries = this.slotIndex;
                    syncMetrics.totalSyncedBytes = totalSyncedToStore;
                    syncMetrics.syncedPerSec = syncedPerSec;
                    this.syncMetricsBuffer.add(syncMetrics);
                    this.inSync.set(true);
                    long slotSize = this.syncSlots();
                    this.logs.getLast().addToSize(slotSize);
                    totalSyncedToStore = this.totalSynced.addAndGet(slotSize);
                    this.slotIndex = 0;
                    this.inSync.set(false);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    this.sendAbortProcessSignal();
                    this.syncException.compareAndSet(null, e);
                    throw e;
                }
                catch (Throwable t) {
                    this.syncException.compareAndSet(null, t);
                    throw t;
                }
                finally {
                    this.syncCond.signalAll();
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ArrayList<SyncMetrics> getSyncMetrics() {
        this.lock.lock();
        try {
            ArrayList<SyncMetrics> arrayList = new ArrayList<SyncMetrics>(this.syncMetricsBuffer);
            return arrayList;
        }
        finally {
            this.lock.unlock();
        }
    }

    private long syncSlots() throws Throwable {
        int retry = 0;
        int logRolled = 0;
        long totalSynced = 0L;
        while (true) {
            try {
                totalSynced = this.syncSlots(this.stream, this.slots, 0, this.slotIndex);
            }
            catch (Throwable e) {
                LOG.warn((Object)("unable to sync slots, retry=" + retry));
                if (++retry < this.maxRetriesBeforeRoll) continue;
                if (logRolled >= this.maxSyncFailureRoll) {
                    LOG.error((Object)"Sync slots after log roll failed, abort.", e);
                    this.sendAbortProcessSignal();
                    throw e;
                }
                if (!this.rollWriterOrDie()) {
                    throw e;
                }
                ++logRolled;
                retry = 0;
                if (this.isRunning()) continue;
            }
            break;
        }
        return totalSynced;
    }

    protected long syncSlots(FSDataOutputStream stream, ByteSlot[] slots, int offset, int count) throws IOException {
        long totalSynced = 0L;
        for (int i = 0; i < count; ++i) {
            ByteSlot data = slots[offset + i];
            data.writeTo(stream);
            totalSynced += (long)data.size();
        }
        if (this.useHsync) {
            stream.hsync();
        } else {
            stream.hflush();
        }
        this.sendPostSyncSignal();
        if (LOG.isTraceEnabled()) {
            LOG.trace((Object)("Sync slots=" + count + '/' + slots.length + ", flushed=" + StringUtils.humanSize(totalSynced)));
        }
        return totalSynced;
    }

    private boolean rollWriterOrDie() {
        for (int i = 0; i < this.rollRetries; ++i) {
            if (i > 0) {
                Threads.sleepWithoutInterrupt(this.waitBeforeRoll * i);
            }
            try {
                if (!this.rollWriter()) continue;
                return true;
            }
            catch (IOException e) {
                LOG.warn((Object)("Unable to roll the log, attempt=" + (i + 1)), (Throwable)e);
            }
        }
        LOG.fatal((Object)"Unable to roll the log");
        this.sendAbortProcessSignal();
        throw new RuntimeException("unable to roll the log");
    }

    private boolean tryRollWriter() {
        try {
            return this.rollWriter();
        }
        catch (IOException e) {
            LOG.warn((Object)"Unable to roll the log", (Throwable)e);
            return false;
        }
    }

    public long getMillisToNextPeriodicRoll() {
        if (this.lastRollTs.get() > 0L && this.periodicRollMsec > 0) {
            return (long)this.periodicRollMsec - this.getMillisFromLastRoll();
        }
        return Long.MAX_VALUE;
    }

    public long getMillisFromLastRoll() {
        return System.currentTimeMillis() - this.lastRollTs.get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void periodicRollForTesting() throws IOException {
        this.lock.lock();
        try {
            this.periodicRoll();
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean rollWriterForTesting() throws IOException {
        this.lock.lock();
        try {
            boolean bl = this.rollWriter();
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    private void periodicRoll() throws IOException {
        if (this.storeTracker.isEmpty()) {
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)"no active procedures");
            }
            this.tryRollWriter();
            this.removeAllLogs(this.flushLogId - 1L);
        } else {
            if (this.storeTracker.isUpdated()) {
                if (LOG.isTraceEnabled()) {
                    LOG.trace((Object)"all the active procedures are in the latest log");
                }
                this.removeAllLogs(this.flushLogId - 1L);
            }
            if (this.totalSynced.get() > this.rollThreshold || this.getMillisToNextPeriodicRoll() <= 0L) {
                this.tryRollWriter();
            }
            this.removeInactiveLogs();
        }
    }

    private boolean rollWriter() throws IOException {
        if (!this.rollWriter(this.flushLogId + 1L)) {
            LOG.warn((Object)("someone else has already created log " + this.flushLogId));
            return false;
        }
        if (WALProcedureStore.getMaxLogId(this.getLogFiles()) > this.flushLogId) {
            LOG.warn((Object)("Someone else created new logs. Expected maxLogId < " + this.flushLogId));
            this.logs.getLast().removeFile();
            return false;
        }
        return true;
    }

    private boolean rollWriter(long logId) throws IOException {
        String durability;
        assert (logId > this.flushLogId) : "logId=" + logId + " flushLogId=" + this.flushLogId;
        assert (this.lock.isHeldByCurrentThread()) : "expected to be the lock owner. " + this.lock.isLocked();
        ProcedureProtos.ProcedureWALHeader header = ProcedureProtos.ProcedureWALHeader.newBuilder().setVersion(1).setType(0).setMinProcId(this.storeTracker.getMinProcId()).setLogId(logId).build();
        FSDataOutputStream newStream = null;
        Path newLogFile = null;
        long startPos = -1L;
        newLogFile = this.getLogFilePath(logId);
        try {
            newStream = this.fs.create(newLogFile, false);
        }
        catch (FileAlreadyExistsException e) {
            LOG.error((Object)("Log file with id=" + logId + " already exists"), (Throwable)e);
            return false;
        }
        catch (RemoteException re) {
            LOG.warn((Object)("failed to create log file with id=" + logId), (Throwable)re);
            return false;
        }
        String string = durability = this.useHsync ? "hsync" : "hflush";
        if (this.enforceStreamCapability && !CommonFSUtils.hasCapability(newStream, durability)) {
            throw new IllegalStateException("The procedure WAL relies on the ability to " + durability + " for proper operation during component failures, but the underlying filesystem does " + "not support doing so. Please check the config value of '" + USE_HSYNC_CONF_KEY + "' to set the desired level of robustness and ensure the config value of '" + "hbase.wal.dir" + "' points to a FileSystem mount that can provide it.");
        }
        try {
            ProcedureWALFormat.writeHeader(newStream, header);
            startPos = newStream.getPos();
        }
        catch (IOException ioe) {
            LOG.warn((Object)"Encountered exception writing header", (Throwable)ioe);
            newStream.close();
            return false;
        }
        this.closeStream();
        this.storeTracker.resetUpdates();
        this.stream = newStream;
        this.flushLogId = logId;
        this.totalSynced.set(0L);
        long rollTs = System.currentTimeMillis();
        this.lastRollTs.set(rollTs);
        this.logs.add(new ProcedureWALFile(this.fs, newLogFile, header, startPos, rollTs));
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Roll new state log: " + logId));
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeStream() {
        block7: {
            try {
                if (this.stream == null) break block7;
                try {
                    ProcedureWALFile log = this.logs.getLast();
                    log.setProcIds(this.storeTracker.getUpdatedMinProcId(), this.storeTracker.getUpdatedMaxProcId());
                    long trailerSize = ProcedureWALFormat.writeTrailer(this.stream, this.storeTracker);
                    log.addToSize(trailerSize);
                }
                catch (IOException e) {
                    LOG.warn((Object)("Unable to write the trailer: " + e.getMessage()));
                }
                this.stream.close();
            }
            catch (IOException e) {
                LOG.error((Object)"Unable to close the stream", (Throwable)e);
            }
            finally {
                this.stream = null;
            }
        }
    }

    private void removeInactiveLogs() {
        ProcedureWALFile log;
        while (this.logs.size() > 1 && !this.storeTracker.isTracking((log = this.logs.getFirst()).getMinProcId(), log.getMaxProcId())) {
            this.removeLogFile(log);
        }
    }

    private void removeAllLogs(long lastLogId) {
        ProcedureWALFile log;
        if (this.logs.size() <= 1) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Remove all state logs with ID less than " + lastLogId));
        }
        while (this.logs.size() > 1 && lastLogId >= (log = this.logs.getFirst()).getLogId()) {
            this.removeLogFile(log);
        }
    }

    private boolean removeLogFile(ProcedureWALFile log) {
        try {
            if (LOG.isTraceEnabled()) {
                LOG.trace((Object)("Removing log=" + log));
            }
            log.removeFile();
            this.logs.remove(log);
            if (LOG.isDebugEnabled()) {
                LOG.info((Object)("Removed log=" + log + " activeLogs=" + this.logs));
            }
            assert (this.logs.size() > 0) : "expected at least one log";
        }
        catch (IOException e) {
            LOG.error((Object)("Unable to remove log: " + log), (Throwable)e);
            return false;
        }
        return true;
    }

    public Path getWALDir() {
        return this.walDir;
    }

    public FileSystem getFileSystem() {
        return this.fs;
    }

    protected Path getLogFilePath(long logId) throws IOException {
        return new Path(this.walDir, String.format("state-%020d.log", logId));
    }

    private static long getLogIdFromName(String name) {
        int start;
        int end = name.lastIndexOf(".log");
        for (start = name.lastIndexOf(45) + 1; start < end && name.charAt(start) == '0'; ++start) {
        }
        return Long.parseLong(name.substring(start, end));
    }

    private FileStatus[] getLogFiles() throws IOException {
        try {
            FileStatus[] files = this.fs.listStatus(this.walDir, WALS_PATH_FILTER);
            Arrays.sort(files, FILE_STATUS_ID_COMPARATOR);
            return files;
        }
        catch (FileNotFoundException e) {
            LOG.warn((Object)("Log directory not found: " + e.getMessage()));
            return null;
        }
    }

    private static long getMaxLogId(FileStatus[] logFiles) {
        long maxLogId = 0L;
        if (logFiles != null && logFiles.length > 0) {
            for (int i = 0; i < logFiles.length; ++i) {
                maxLogId = Math.max(maxLogId, WALProcedureStore.getLogIdFromName(logFiles[i].getPath().getName()));
            }
        }
        return maxLogId;
    }

    private long initOldLogs(FileStatus[] logFiles) throws IOException {
        this.logs.clear();
        long maxLogId = 0L;
        if (logFiles != null && logFiles.length > 0) {
            for (int i = 0; i < logFiles.length; ++i) {
                Path logPath = logFiles[i].getPath();
                this.leaseRecovery.recoverFileLease(this.fs, logPath);
                maxLogId = Math.max(maxLogId, WALProcedureStore.getLogIdFromName(logPath.getName()));
                ProcedureWALFile log = this.initOldLog(logFiles[i]);
                if (log == null) continue;
                this.logs.add(log);
            }
            Collections.sort(this.logs);
            this.initTrackerFromOldLogs();
        }
        return maxLogId;
    }

    private void initTrackerFromOldLogs() {
        if (!this.logs.isEmpty()) {
            ProcedureWALFile log = this.logs.getLast();
            try {
                log.readTracker(this.storeTracker);
            }
            catch (IOException e) {
                LOG.warn((Object)("Unable to read tracker for " + log + " - " + e.getMessage()));
                this.storeTracker.reset();
                this.storeTracker.setPartialFlag(true);
            }
        }
    }

    private ProcedureWALFile initOldLog(FileStatus logFile) throws IOException {
        ProcedureWALFile log = new ProcedureWALFile(this.fs, logFile);
        if (logFile.getLen() == 0L) {
            LOG.warn((Object)("Remove uninitialized log: " + logFile));
            log.removeFile();
            return null;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("Opening state-log: " + logFile));
        }
        try {
            log.open();
        }
        catch (ProcedureWALFormat.InvalidWALDataException e) {
            LOG.warn((Object)("Remove uninitialized log: " + logFile), (Throwable)e);
            log.removeFile();
            return null;
        }
        catch (IOException e) {
            String msg = "Unable to read state log: " + logFile;
            LOG.error((Object)msg, (Throwable)e);
            throw new IOException(msg, e);
        }
        if (log.isCompacted()) {
            try {
                log.readTrailer();
            }
            catch (IOException e) {
                LOG.warn((Object)("Unfinished compacted log: " + logFile), (Throwable)e);
                log.removeFile();
                return null;
            }
        }
        return log;
    }

    private static enum PushType {
        INSERT,
        UPDATE,
        DELETE;

    }

    public static class SyncMetrics {
        private long timestamp;
        private long syncWaitMs;
        private long totalSyncedBytes;
        private int syncedEntries;
        private float syncedPerSec;

        public long getTimestamp() {
            return this.timestamp;
        }

        public long getSyncWaitMs() {
            return this.syncWaitMs;
        }

        public long getTotalSyncedBytes() {
            return this.totalSyncedBytes;
        }

        public long getSyncedEntries() {
            return this.syncedEntries;
        }

        public float getSyncedPerSec() {
            return this.syncedPerSec;
        }
    }

    public static interface LeaseRecovery {
        public void recoverFileLease(FileSystem var1, Path var2) throws IOException;
    }
}

