/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.db.commitlog;

import com.google.common.collect.Ordering;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOError;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.zip.CRC32;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.cassandra.concurrent.Stage;
import org.apache.cassandra.concurrent.StageManager;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.Table;
import org.apache.cassandra.db.UnserializableColumnFamilyException;
import org.apache.cassandra.db.commitlog.BatchCommitLogExecutorService;
import org.apache.cassandra.db.commitlog.CommitLogAllocator;
import org.apache.cassandra.db.commitlog.CommitLogMBean;
import org.apache.cassandra.db.commitlog.CommitLogSegment;
import org.apache.cassandra.db.commitlog.ICommitLogExecutorService;
import org.apache.cassandra.db.commitlog.PeriodicCommitLogExecutorService;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.io.IColumnSerializer;
import org.apache.cassandra.io.util.FastByteArrayInputStream;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.io.util.RandomAccessReader;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.commons.lang.StringUtils;
import org.cliffc.high_scale_lib.NonBlockingHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitLog
implements CommitLogMBean {
    private static final int MAX_OUTSTANDING_REPLAY_COUNT = 1024;
    static final Logger logger = LoggerFactory.getLogger(CommitLog.class);
    public static final CommitLog instance = new CommitLog();
    private final ICommitLogExecutorService executor;
    public final CommitLogAllocator allocator;
    public static final int END_OF_SEGMENT_MARKER = 0;
    public static final int END_OF_SEGMENT_MARKER_SIZE = 4;
    public static final int SEGMENT_SIZE = 0x8000000;
    public CommitLogSegment activeSegment;

    private CommitLog() {
        try {
            DatabaseDescriptor.createAllDirectories();
            this.allocator = new CommitLogAllocator();
            this.activateNextSegment();
        }
        catch (IOException e) {
            throw new IOError(e);
        }
        this.executor = DatabaseDescriptor.getCommitLogSync() == Config.CommitLogSync.batch ? new BatchCommitLogExecutorService() : new PeriodicCommitLogExecutorService(this);
        MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
        try {
            mbs.registerMBean(this, new ObjectName("org.apache.cassandra.db:type=Commitlog"));
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void resetUnsafe() throws IOException {
        this.allocator.resetUnsafe();
        this.activateNextSegment();
    }

    public int recover() throws IOException {
        Object[] files = new File(DatabaseDescriptor.getCommitLogLocation()).listFiles(new FilenameFilter(){

            @Override
            public boolean accept(File dir, String name) {
                return CommitLogSegment.possibleCommitLogFile(name) && !CommitLog.instance.allocator.manages(name);
            }
        });
        int replayed = 0;
        if (files.length == 0) {
            logger.info("No commitlog files found; skipping replay");
        } else {
            Arrays.sort(files, new FileUtils.FileComparator());
            logger.info("Replaying " + StringUtils.join((Object[])files, (String)", "));
            replayed = this.recover((File[])files);
            logger.info("Log replay complete, " + replayed + " replayed mutations");
            for (Object f : files) {
                CommitLog.instance.allocator.recycleSegment((File)f);
            }
        }
        this.allocator.enableReserveSegmentCreation();
        return replayed;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int recover(File[] clogs) throws IOException {
        NonBlockingHashSet tablesRecovered = new NonBlockingHashSet();
        ArrayList futures = new ArrayList();
        byte[] bytes = new byte[4096];
        HashMap<Integer, AtomicInteger> invalidMutations = new HashMap<Integer, AtomicInteger>();
        final AtomicInteger replayedCount = new AtomicInteger();
        final HashMap<Integer, ReplayPosition> cfPositions = new HashMap<Integer, ReplayPosition>();
        for (ColumnFamilyStore cfs : ColumnFamilyStore.all()) {
            ReplayPosition rp = ReplayPosition.getReplayPosition(cfs.getSSTables());
            cfPositions.put(cfs.metadata.cfId, rp);
        }
        ReplayPosition globalPosition = (ReplayPosition)Ordering.from(ReplayPosition.comparator).min(cfPositions.values());
        CRC32 checksum = new CRC32();
        block9: for (File file : clogs) {
            logger.info("Replaying " + file.getPath());
            final long segment = CommitLogSegment.idFromFilename(file.getName());
            RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true);
            assert (reader.length() <= Integer.MAX_VALUE);
            try {
                int replayPosition = globalPosition.segment < segment ? 0 : (globalPosition.segment == segment ? globalPosition.position : (int)reader.length());
                if (replayPosition < 0 || (long)replayPosition >= reader.length()) {
                    logger.debug("skipping replay of fully-flushed {}", (Object)file);
                    continue;
                }
                reader.seek(replayPosition);
                if (logger.isDebugEnabled()) {
                    logger.debug("Replaying " + file + " starting at " + reader.getFilePointer());
                }
                while (!reader.isEOF()) {
                    long claimedCRC32;
                    int serializedSize;
                    if (logger.isDebugEnabled()) {
                        logger.debug("Reading mutation at " + reader.getFilePointer());
                    }
                    try {
                        serializedSize = reader.readInt();
                        if (serializedSize == 0) {
                            logger.debug("Encountered end of segment marker at " + reader.getFilePointer());
                            continue block9;
                        }
                        if (serializedSize < 10) {
                            continue block9;
                        }
                        long claimedSizeChecksum = reader.readLong();
                        checksum.reset();
                        checksum.update(serializedSize);
                        if (checksum.getValue() != claimedSizeChecksum) {
                            continue block9;
                        }
                        if (serializedSize > bytes.length) {
                            bytes = new byte[(int)(1.2 * (double)serializedSize)];
                        }
                        reader.readFully(bytes, 0, serializedSize);
                        claimedCRC32 = reader.readLong();
                    }
                    catch (EOFException eof) {
                        continue block9;
                    }
                    checksum.update(bytes, 0, serializedSize);
                    if (claimedCRC32 != checksum.getValue()) continue;
                    FastByteArrayInputStream bufIn = new FastByteArrayInputStream(bytes, 0, serializedSize);
                    RowMutation rm = null;
                    try {
                        rm = RowMutation.serializer().deserialize(new DataInputStream(bufIn), 4, IColumnSerializer.Flag.LOCAL);
                    }
                    catch (UnserializableColumnFamilyException ex) {
                        AtomicInteger i = (AtomicInteger)invalidMutations.get(ex.cfId);
                        if (i == null) {
                            i = new AtomicInteger(1);
                            invalidMutations.put(ex.cfId, i);
                            continue;
                        }
                        i.incrementAndGet();
                        continue;
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug(String.format("replaying mutation for %s.%s: %s", rm.getTable(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), (String)", ") + "}"));
                    }
                    final long entryLocation = reader.getFilePointer();
                    final RowMutation frm = rm;
                    WrappedRunnable runnable = new WrappedRunnable((Set)tablesRecovered){
                        final /* synthetic */ Set val$tablesRecovered;
                        {
                            this.val$tablesRecovered = set;
                        }

                        @Override
                        public void runMayThrow() throws IOException {
                            if (Schema.instance.getKSMetaData(frm.getTable()) == null) {
                                return;
                            }
                            Table table = Table.open(frm.getTable());
                            RowMutation newRm = new RowMutation(frm.getTable(), frm.key());
                            for (ColumnFamily columnFamily : frm.getColumnFamilies()) {
                                if (Schema.instance.getCF(columnFamily.id()) == null) continue;
                                ReplayPosition rp = (ReplayPosition)cfPositions.get(columnFamily.id());
                                if (segment <= rp.segment && (segment != rp.segment || entryLocation <= (long)rp.position)) continue;
                                newRm.add(columnFamily);
                                replayedCount.incrementAndGet();
                            }
                            if (!newRm.isEmpty()) {
                                Table.open(newRm.getTable()).apply(newRm, false);
                                this.val$tablesRecovered.add(table);
                            }
                        }
                    };
                    futures.add(StageManager.getStage(Stage.MUTATION).submit(runnable));
                    if (futures.size() <= 1024) continue;
                    FBUtilities.waitOnFutures(futures);
                    futures.clear();
                }
            }
            finally {
                FileUtils.closeQuietly(reader);
                logger.info("Finished reading " + file);
            }
        }
        for (Map.Entry entry : invalidMutations.entrySet()) {
            logger.info(String.format("Skipped %d mutations from unknown (probably removed) CF with id %d", ((AtomicInteger)entry.getValue()).intValue(), entry.getKey()));
        }
        FBUtilities.waitOnFutures(futures);
        logger.debug("Finished waiting on mutations from recovery");
        futures.clear();
        for (Table table : tablesRecovered) {
            futures.addAll(table.flush());
        }
        FBUtilities.waitOnFutures(futures);
        return replayedCount.get();
    }

    public ReplayPosition getContext() {
        Callable<ReplayPosition> task = new Callable<ReplayPosition>(){

            @Override
            public ReplayPosition call() throws Exception {
                return CommitLog.this.activeSegment.getContext();
            }
        };
        try {
            return this.executor.submit(task).get();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public int activeSegments() {
        return this.allocator.getActiveSegments().size();
    }

    public void add(RowMutation rm) throws IOException {
        long totalSize = RowMutation.serializer().serializedSize(rm, 4) + 20L;
        if (totalSize > 0x8000000L) {
            logger.warn("Skipping commitlog append of extremely large mutation ({} bytes)", (Object)totalSize);
            return;
        }
        this.executor.add(new LogRecordAdder(rm));
    }

    public void discardCompletedSegments(final Integer cfId, final ReplayPosition context) throws IOException {
        Callable task = new Callable(){

            public Object call() throws IOException {
                logger.debug("discard completed log segments for {}, column family {}", (Object)context, (Object)cfId);
                Iterator<CommitLogSegment> iter = CommitLog.this.allocator.getActiveSegments().iterator();
                while (iter.hasNext()) {
                    CommitLogSegment segment = iter.next();
                    segment.markClean(cfId, context);
                    if (segment.isUnused() && iter.hasNext()) {
                        logger.debug("Commit log segment {} is unused", (Object)segment);
                        CommitLog.this.allocator.recycleSegment(segment);
                    } else if (logger.isDebugEnabled()) {
                        logger.debug(String.format("Not safe to delete commit log %s; dirty is %s; hasNext: %s", segment, segment.dirtyString(), iter.hasNext()));
                    }
                    if (!segment.contains(context)) continue;
                    break;
                }
                return null;
            }
        };
        try {
            this.executor.submit(task).get();
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    public void sync() throws IOException {
        for (CommitLogSegment segment : this.allocator.getActiveSegments()) {
            segment.sync();
        }
    }

    @Override
    public long getCompletedTasks() {
        return this.executor.getCompletedTasks();
    }

    @Override
    public long getPendingTasks() {
        return this.executor.getPendingTasks();
    }

    @Override
    public long getTotalCommitlogSize() {
        return this.allocator.bytesUsed();
    }

    public void forceNewSegment() throws ExecutionException, InterruptedException {
        logger.debug("Forcing new segment creation");
        Callable task = new Callable(){

            public Object call() throws IOException {
                if (CommitLog.this.activeSegment.position() > 0) {
                    CommitLog.this.activateNextSegment();
                }
                return null;
            }
        };
        this.executor.submit(task).get();
    }

    private void activateNextSegment() throws IOException {
        this.activeSegment = this.allocator.fetchSegment();
    }

    public void shutdownBlocking() throws InterruptedException {
        this.executor.shutdown();
        this.executor.awaitTermination();
        this.allocator.shutdown();
        this.allocator.awaitTermination();
    }

    class LogRecordAdder
    implements Callable,
    Runnable {
        final RowMutation rowMutation;

        LogRecordAdder(RowMutation rm) {
            this.rowMutation = rm;
        }

        @Override
        public void run() {
            try {
                if (!CommitLog.this.activeSegment.hasCapacityFor(this.rowMutation)) {
                    CommitLog.this.activateNextSegment();
                }
                CommitLog.this.activeSegment.write(this.rowMutation);
            }
            catch (IOException e) {
                throw new IOError(e);
            }
        }

        public Object call() throws Exception {
            this.run();
            return null;
        }
    }
}

