/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.kernel.impl.transaction.log;

import java.io.Flushable;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.Mockito;
import org.neo4j.kernel.KernelHealth;
import org.neo4j.kernel.impl.transaction.DeadSimpleTransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.BatchingTransactionAppender;
import org.neo4j.kernel.impl.transaction.log.InMemoryLogChannel;
import org.neo4j.kernel.impl.transaction.log.LogFile;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.impl.transaction.log.TransactionMetadataCache;
import org.neo4j.kernel.impl.transaction.log.rotation.LogRotation;
import org.neo4j.kernel.impl.transaction.tracing.LogAppendEvent;
import org.neo4j.kernel.impl.transaction.tracing.LogForceEvents;
import org.neo4j.kernel.impl.util.IdOrderingQueue;
import org.neo4j.kernel.lifecycle.LifeRule;
import org.neo4j.test.ThreadTestUtils;

public class BatchingTransactionAppenderConcurrencyTest {
    private static ExecutorService executor;
    @Rule
    public final LifeRule life = new LifeRule();
    private final LogAppendEvent logAppendEvent = LogAppendEvent.NULL;
    private final LogFile logFile = (LogFile)Mockito.mock(LogFile.class);
    private final LogRotation logRotation = LogRotation.NO_ROTATION;
    private final TransactionMetadataCache transactionMetadataCache = new TransactionMetadataCache(10, 10);
    private final TransactionIdStore transactionIdStore = new DeadSimpleTransactionIdStore();
    private final IdOrderingQueue legacyIndexTransactionOrdering = IdOrderingQueue.BYPASS;
    private final KernelHealth kernelHealth = (KernelHealth)Mockito.mock(KernelHealth.class);
    private final Semaphore forceSemaphore = new Semaphore(0);
    private final BlockingQueue<ChannelCommand> channelCommandQueue = new LinkedBlockingQueue<ChannelCommand>(2);

    @BeforeClass
    public static void setUpExecutor() {
        executor = Executors.newCachedThreadPool();
    }

    @AfterClass
    public static void tearDownExecutor() {
        executor.shutdown();
        executor = null;
    }

    @Before
    public void setUp() {
        class Channel
        extends InMemoryLogChannel
        implements Flushable {
            Channel() {
            }

            @Override
            public Flushable emptyBufferIntoChannelAndClearIt() {
                try {
                    BatchingTransactionAppenderConcurrencyTest.this.channelCommandQueue.put(ChannelCommand.emptyBufferIntoChannelAndClearIt);
                }
                catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return this;
            }

            @Override
            public void flush() throws IOException {
                try {
                    BatchingTransactionAppenderConcurrencyTest.this.forceSemaphore.release();
                    BatchingTransactionAppenderConcurrencyTest.this.channelCommandQueue.put(ChannelCommand.force);
                }
                catch (InterruptedException e) {
                    throw new IOException(e);
                }
            }
        }
        Mockito.when((Object)this.logFile.getWriter()).thenReturn((Object)new Channel());
    }

    private Runnable createForceAfterAppendRunnable(final BatchingTransactionAppender appender) {
        return new Runnable(){

            @Override
            public void run() {
                try {
                    appender.forceAfterAppend((LogForceEvents)BatchingTransactionAppenderConcurrencyTest.this.logAppendEvent);
                }
                catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        };
    }

    @Test
    public void shouldForceLogChannel() throws Throwable {
        BatchingTransactionAppender appender = this.life.add(new BatchingTransactionAppender(this.logFile, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, this.legacyIndexTransactionOrdering, this.kernelHealth));
        this.life.start();
        appender.forceAfterAppend((LogForceEvents)this.logAppendEvent);
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.force)));
        Assert.assertTrue((boolean)this.channelCommandQueue.isEmpty());
    }

    @Test
    public void shouldWaitForOngoingForceToCompleteBeforeForcingAgain() throws Throwable {
        this.channelCommandQueue.put(ChannelCommand.dummy);
        BatchingTransactionAppender appender = this.life.add(new BatchingTransactionAppender(this.logFile, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, this.legacyIndexTransactionOrdering, this.kernelHealth));
        this.life.start();
        Runnable runnable = this.createForceAfterAppendRunnable(appender);
        Future<?> future = executor.submit(runnable);
        this.forceSemaphore.acquire();
        Thread otherThread = ThreadTestUtils.fork((Runnable)runnable);
        ThreadTestUtils.awaitThreadState((Thread)otherThread, (long)5000L, (Thread.State)Thread.State.TIMED_WAITING, (Thread.State[])new Thread.State[0]);
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.dummy)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.force)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.force)));
        future.get();
        otherThread.join();
        Assert.assertTrue((boolean)this.channelCommandQueue.isEmpty());
    }

    @Test
    public void shouldBatchUpMultipleWaitingForceRequests() throws Throwable {
        this.channelCommandQueue.put(ChannelCommand.dummy);
        BatchingTransactionAppender appender = this.life.add(new BatchingTransactionAppender(this.logFile, this.logRotation, this.transactionMetadataCache, this.transactionIdStore, this.legacyIndexTransactionOrdering, this.kernelHealth));
        this.life.start();
        Runnable runnable = this.createForceAfterAppendRunnable(appender);
        Future<?> future = executor.submit(runnable);
        this.forceSemaphore.acquire();
        Thread[] otherThreads = new Thread[10];
        for (int i = 0; i < otherThreads.length; ++i) {
            otherThreads[i] = ThreadTestUtils.fork((Runnable)runnable);
        }
        for (Thread otherThread : otherThreads) {
            ThreadTestUtils.awaitThreadState((Thread)otherThread, (long)5000L, (Thread.State)Thread.State.TIMED_WAITING, (Thread.State[])new Thread.State[0]);
        }
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.dummy)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.force)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.emptyBufferIntoChannelAndClearIt)));
        Assert.assertThat((Object)((Object)this.channelCommandQueue.take()), (Matcher)Matchers.is((Object)((Object)ChannelCommand.force)));
        future.get();
        for (Thread otherThread : otherThreads) {
            otherThread.join();
        }
        Assert.assertTrue((boolean)this.channelCommandQueue.isEmpty());
    }

    private static enum ChannelCommand {
        emptyBufferIntoChannelAndClearIt,
        force,
        dummy;

    }
}

