package org.mule.tck.core.util.queue;

import java.io.Serializable;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;
import org.mule.runtime.core.api.util.concurrent.Latch;
import org.mule.runtime.core.api.util.queue.DefaultQueueConfiguration;
import org.mule.runtime.core.api.util.queue.Queue;
import org.mule.runtime.core.api.util.queue.QueueSession;
import org.mule.runtime.core.internal.util.queue.AbstractQueueManager;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/mule/tck/core/util/queue/AbstractTransactionQueueManagerTestCase.class */
public abstract class AbstractTransactionQueueManagerTestCase extends AbstractMuleContextTestCase {
    public static final int THREAD_EXECUTION_TIMEOUT = 2000;
    protected transient Logger logger = LoggerFactory.getLogger(getClass());
    protected QueueTestComponent disposeTest = new QueueTestComponent();

    protected abstract AbstractQueueManager createQueueManager() throws Exception;

    protected abstract boolean isPersistent();

    @Test
    public void testPutTake() throws Exception {
        AbstractQueueManager createQueueManager = createQueueManager();
        createQueueManager.start();
        Queue queue = createQueueManager.getQueueSession().getQueue("queue1");
        Assert.assertEquals("Queue size", 0L, queue.size());
        queue.put("String1");
        Assert.assertEquals("Queue size", 1L, queue.size());
        Serializable take = queue.take();
        Assert.assertNotNull(take);
        Assert.assertEquals("Queue content", "String1", take);
        Assert.assertEquals("Queue size", 0L, queue.size());
        createQueueManager.stop();
    }

    @Test
    public void testTakePut() throws Exception {
        final AbstractQueueManager createQueueManager = createQueueManager();
        createQueueManager.start();
        final Latch latch = new Latch();
        Thread thread = new Thread() { // from class: org.mule.tck.core.util.queue.AbstractTransactionQueueManagerTestCase.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    latch.countDown();
                    Thread.sleep(200L);
                    Queue queue = createQueueManager.getQueueSession().getQueue("queue1");
                    Assert.assertEquals("Queue size", 0L, queue.size());
                    queue.put("String1");
                } catch (Exception e) {
                }
            }
        };
        thread.start();
        latch.await();
        long currentTimeMillis = System.currentTimeMillis();
        Queue queue = createQueueManager.getQueueSession().getQueue("queue1");
        Assert.assertEquals("Queue size", 0L, queue.size());
        Serializable take = queue.take();
        long currentTimeMillis2 = System.currentTimeMillis();
        thread.join();
        Assert.assertNotNull(take);
        Assert.assertEquals("Queue content", "String1", take);
        Assert.assertEquals("Queue size", 0L, queue.size());
        Assert.assertTrue(currentTimeMillis2 - currentTimeMillis > 100);
        createQueueManager.stop();
    }

    @Test
    public void testPutTakeUntake() throws Exception {
        final AbstractQueueManager createQueueManager = createQueueManager();
        createQueueManager.start();
        final Latch latch = new Latch();
        Thread thread = new Thread() { // from class: org.mule.tck.core.util.queue.AbstractTransactionQueueManagerTestCase.2
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    latch.countDown();
                    Thread.sleep(200L);
                    Queue queue = createQueueManager.getQueueSession().getQueue("queue1");
                    Assert.assertEquals("Queue size", 0L, queue.size());
                    queue.put("String1");
                    queue.put("String2");
                } catch (Exception e) {
                }
            }
        };
        thread.start();
        latch.await();
        long currentTimeMillis = System.currentTimeMillis();
        Queue queue = createQueueManager.getQueueSession().getQueue("queue1");
        Assert.assertEquals("Queue size", 0L, queue.size());
        Serializable take = queue.take();
        long currentTimeMillis2 = System.currentTimeMillis();
        thread.join();
        Assert.assertNotNull(take);
        Assert.assertEquals("Queue content", "String1", take);
        Assert.assertEquals("Queue size", 1L, queue.size());
        Assert.assertTrue(currentTimeMillis2 - currentTimeMillis > 100);
        queue.untake(take);
        Assert.assertEquals("Queue size", 2L, queue.size());
        Assert.assertEquals("Queue content", "String1", queue.take());
        Assert.assertEquals("Queue size", 1L, queue.size());
        createQueueManager.stop();
    }

    @Test
    public void testClearWithoutTransaction() throws Exception {
        AbstractQueueManager createQueueManager = createQueueManager();
        createQueueManager.start();
        Queue queue = createQueueManager.getQueueSession().getQueue("queue1");
        Assert.assertEquals("Queue size", 0L, queue.size());
        queue.put("String1");
        Assert.assertEquals("Queue size", 1L, queue.size());
        queue.clear();
        Assert.assertEquals("Queue size", 0L, queue.size());
        createQueueManager.stop();
    }

    @Test
    public void testClearInTransaction() throws Exception {
        AbstractQueueManager createQueueManager = createQueueManager();
        createQueueManager.start();
        QueueSession queueSession = createQueueManager.getQueueSession();
        queueSession.begin();
        Queue queue = queueSession.getQueue("queue1");
        Assert.assertEquals("Queue size", 0L, queue.size());
        queue.put("String1");
        queueSession.commit();
        Assert.assertEquals("Queue size", 1L, queue.size());
        queueSession.begin();
        Assert.assertEquals("Queue size", 1L, queue.size());
        queue.clear();
        queueSession.rollback();
        Assert.assertEquals("Queue size", 1L, queue.size());
        queueSession.begin();
        Assert.assertEquals("Queue size", 1L, queue.size());
        queue.clear();
        queueSession.commit();
        Assert.assertEquals("Queue size", 0L, queue.size());
        createQueueManager.stop();
    }

    @Test
    public void testTakePutRollbackPut() throws Exception {
        final AbstractQueueManager createQueueManager = createQueueManager();
        createQueueManager.start();
        final Latch latch = new Latch();
        Thread thread = new Thread() { // from class: org.mule.tck.core.util.queue.AbstractTransactionQueueManagerTestCase.3
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    latch.countDown();
                    Thread.sleep(200L);
                    QueueSession queueSession = createQueueManager.getQueueSession();
                    Queue queue = queueSession.getQueue("queue1");
                    Assert.assertEquals("Queue size", 0L, queue.size());
                    queueSession.begin();
                    queue.put("String1");
                    queueSession.rollback();
                    queueSession.begin();
                    queue.put("String2");
                    queueSession.commit();
                } catch (Exception e) {
                }
            }
        };
        thread.start();
        latch.await();
        long currentTimeMillis = System.currentTimeMillis();
        Queue queue = createQueueManager.getQueueSession().getQueue("queue1");
        Assert.assertEquals("Queue size", 0L, queue.size());
        Serializable take = queue.take();
        long currentTimeMillis2 = System.currentTimeMillis();
        thread.join();
        Assert.assertNotNull(take);
        Assert.assertEquals("Queue content", "String2", take);
        Assert.assertEquals("Queue size", 0L, queue.size());
        Assert.assertTrue(currentTimeMillis2 - currentTimeMillis > 100);
        createQueueManager.stop();
    }

    @Test
    public void testPutTakeUntakeRollbackUntake() throws Exception {
        final AbstractQueueManager createQueueManager = createQueueManager();
        createQueueManager.start();
        final Latch latch = new Latch();
        final String str = "string1";
        final String str2 = "string2";
        Thread thread = new Thread() { // from class: org.mule.tck.core.util.queue.AbstractTransactionQueueManagerTestCase.4
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    latch.countDown();
                    Thread.sleep(200L);
                    QueueSession queueSession = createQueueManager.getQueueSession();
                    Queue queue = queueSession.getQueue("queue1");
                    Assert.assertEquals("Queue size", 0L, queue.size());
                    queueSession.begin();
                    queue.untake(str);
                    queueSession.commit();
                    queueSession.begin();
                    queue.untake(str2);
                    queueSession.rollback();
                } catch (Exception e) {
                }
            }
        };
        thread.start();
        latch.await();
        long currentTimeMillis = System.currentTimeMillis();
        Queue queue = createQueueManager.getQueueSession().getQueue("queue1");
        Assert.assertEquals("Queue size", 0L, queue.size());
        Serializable take = queue.take();
        long currentTimeMillis2 = System.currentTimeMillis();
        thread.join();
        Assert.assertNotNull(take);
        Assert.assertEquals("Queue content", "string1", take);
        Assert.assertEquals("Queue size", 0L, queue.size());
        Assert.assertTrue(currentTimeMillis2 - currentTimeMillis > 100);
        createQueueManager.stop();
    }

    @Test
    public void testTakePutOverCapacity() throws Exception {
        final AbstractQueueManager createQueueManager = createQueueManager();
        createQueueManager.start();
        createQueueManager.setDefaultQueueConfiguration(new DefaultQueueConfiguration(2, false));
        final Latch latch = new Latch();
        Thread thread = new Thread() { // from class: org.mule.tck.core.util.queue.AbstractTransactionQueueManagerTestCase.5
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                try {
                    latch.await();
                    Thread.sleep(200L);
                    Assert.assertEquals("Queue content", "String1", createQueueManager.getQueueSession().getQueue("queue1").take());
                } catch (Exception e) {
                }
            }
        };
        thread.start();
        Queue queue = createQueueManager.getQueueSession().getQueue("queue1");
        Assert.assertEquals("Queue size", 0L, queue.size());
        queue.put("String1");
        queue.put("String2");
        latch.countDown();
        long currentTimeMillis = System.currentTimeMillis();
        queue.put("String3");
        long currentTimeMillis2 = System.currentTimeMillis();
        thread.join();
        Assert.assertEquals("Queue size", 2L, queue.size());
        Assert.assertTrue(currentTimeMillis2 - currentTimeMillis > 100);
        createQueueManager.stop();
    }

    @Test
    public void testPutWithPersistence() throws Exception {
        if (!isPersistent()) {
            this.logger.info("Ignoring test because queue manager is not persistent");
            return;
        }
        AbstractQueueManager createQueueManager = createQueueManager();
        try {
            QueueSession queueSession = createQueueManager.getQueueSession();
            createQueueManager.start();
            queueSession.getQueue("queue1").put("String1");
            Assert.assertEquals("Queue size", 1L, r0.size());
            Assert.assertEquals("Queue size", 1L, queueSession.getQueue("queue1").size());
            createQueueManager.stop();
            createQueueManager.dispose();
            createQueueManager = createQueueManager();
            try {
                QueueSession queueSession2 = createQueueManager.getQueueSession();
                createQueueManager.start();
                Assert.assertEquals("Queue size", 1L, queueSession2.getQueue("queue1").size());
                createQueueManager.stop();
                createQueueManager.dispose();
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testTransactedPutCommitWithPersistence() throws Exception {
        if (!isPersistent()) {
            this.logger.info("Ignoring test because queue manager is not persistent");
            return;
        }
        AbstractQueueManager createQueueManager = createQueueManager();
        try {
            QueueSession queueSession = createQueueManager.getQueueSession();
            Queue queue = queueSession.getQueue("queue1");
            createQueueManager.start();
            queueSession.begin();
            queue.put("String1");
            Assert.assertEquals("Queue size", 1L, queue.size());
            queueSession.commit();
            Assert.assertEquals("Queue size", 1L, queue.size());
            Assert.assertEquals("Queue size", 1L, createQueueManager.getQueueSession().getQueue("queue1").size());
            createQueueManager.stop();
            createQueueManager = createQueueManager();
            Queue queue2 = createQueueManager.getQueueSession().getQueue("queue1");
            createQueueManager.start();
            Assert.assertEquals("Queue size", 1L, queue2.size());
            createQueueManager.stop();
            createQueueManager.dispose();
        } catch (Throwable th) {
            createQueueManager.stop();
            createQueueManager.dispose();
            throw th;
        }
    }

    @Test
    public void testTransactedPutRollbackWithPersistence() throws Exception {
        if (!isPersistent()) {
            this.logger.info("Ignoring test because queue manager is not persistent");
            return;
        }
        AbstractQueueManager createQueueManager = createQueueManager();
        try {
            createQueueManager.start();
            QueueSession queueSession = createQueueManager.getQueueSession();
            Queue queue = queueSession.getQueue("queue1");
            queueSession.begin();
            queue.put("String1");
            Assert.assertEquals("Queue size", 1L, queue.size());
            queueSession.rollback();
            Assert.assertEquals("Queue size", 0L, queue.size());
            Assert.assertEquals("Queue size", 0L, createQueueManager.getQueueSession().getQueue("queue1").size());
            createQueueManager.stop();
            createQueueManager = createQueueManager();
            createQueueManager.start();
            Assert.assertEquals("Queue size", 0L, createQueueManager.getQueueSession().getQueue("queue1").size());
            createQueueManager.stop();
            createQueueManager.dispose();
        } catch (Throwable th) {
            createQueueManager.stop();
            createQueueManager.dispose();
            throw th;
        }
    }

    @Test
    public void testPutTake_RespectsOrderOnPersistence() throws Exception {
        if (isPersistent()) {
            AbstractQueueManager createQueueManager = createQueueManager();
            Queue queue = createQueueManager.getQueueSession().getQueue("queue1");
            createQueueManager.start();
            Assert.assertEquals("Queue size", 0L, queue.size());
            for (int i = 1; i <= 10; i++) {
                queue.put("String" + i);
                Assert.assertEquals("Queue size", i, queue.size());
            }
            createQueueManager.stop();
            AbstractQueueManager createQueueManager2 = createQueueManager();
            Queue queue2 = createQueueManager2.getQueueSession().getQueue("queue1");
            createQueueManager2.start();
            for (int i2 = 1; i2 <= 10; i2++) {
                Serializable take = queue2.take();
                Assert.assertNotNull(take);
                Assert.assertEquals("Queue content", "String" + i2, take);
            }
            Assert.assertEquals("Queue size", 0L, queue2.size());
            createQueueManager2.stop();
            createQueueManager2.dispose();
        }
    }

    @Test
    public void testTransactionsOnMultipleQueues() throws Exception {
        AbstractQueueManager createQueueManager = createQueueManager();
        try {
            createQueueManager.start();
            QueueSession queueSession = createQueueManager.getQueueSession();
            QueueSession queueSession2 = createQueueManager.getQueueSession();
            Queue queue = queueSession.getQueue("queue1");
            Queue queue2 = queueSession2.getQueue("queue1");
            Queue queue3 = queueSession.getQueue("queue2");
            Queue queue4 = queueSession2.getQueue("queue2");
            queue.put("String1");
            Assert.assertEquals("Queue size", 1L, queue.size());
            Assert.assertEquals("Queue size", 1L, queue2.size());
            queueSession.begin();
            Serializable take = queue.take();
            Assert.assertNotNull(take);
            Assert.assertEquals("String1", take);
            Assert.assertEquals("Queue size", 0L, queue.size());
            Assert.assertEquals("Queue size", 0L, queue2.size());
            queue3.put("String2");
            Assert.assertEquals("Queue size", 1L, queue3.size());
            Assert.assertEquals("Queue size", 0L, queue4.size());
            queueSession.commit();
            Assert.assertEquals("Queue size", 0L, queue.size());
            Assert.assertEquals("Queue size", 0L, queue2.size());
            Assert.assertEquals("Queue size", 1L, queue3.size());
            Assert.assertEquals("Queue size", 1L, queue4.size());
            queueSession.begin();
            Serializable take2 = queue3.take();
            Assert.assertNotNull(take2);
            Assert.assertEquals("String2", take2);
            Assert.assertEquals("Queue size", 0L, queue.size());
            Assert.assertEquals("Queue size", 0L, queue2.size());
            Assert.assertEquals("Queue size", 0L, queue3.size());
            Assert.assertEquals("Queue size", 0L, queue4.size());
            queue.put("String1");
            Assert.assertEquals("Queue size", 1L, queue.size());
            Assert.assertEquals("Queue size", 0L, queue2.size());
            Assert.assertEquals("Queue size", 0L, queue3.size());
            Assert.assertEquals("Queue size", 0L, queue4.size());
            queueSession.rollback();
            Assert.assertEquals("Queue size", 0L, queue.size());
            Assert.assertEquals("Queue size", 0L, queue2.size());
            Assert.assertEquals("Queue size", 1L, queue3.size());
            Assert.assertEquals("Queue size", 1L, queue4.size());
            createQueueManager.stop();
            createQueueManager.dispose();
        } catch (Throwable th) {
            createQueueManager.stop();
            createQueueManager.dispose();
            throw th;
        }
    }

    @Test
    public void testPoll() throws Exception {
        AbstractQueueManager createQueueManager = createQueueManager();
        try {
            createQueueManager.start();
            Queue queue = createQueueManager.getQueueSession().getQueue("queue1");
            Assert.assertEquals("Queue size", 0L, queue.size());
            Serializable poll = queue.poll(0L);
            Assert.assertEquals("Queue size", 0L, queue.size());
            Assert.assertNull(poll);
            Serializable poll2 = queue.poll(1000L);
            Assert.assertEquals("Queue size", 0L, queue.size());
            Assert.assertNull(poll2);
            queue.put("String1");
            Assert.assertEquals("Queue size", 1L, queue.size());
            Serializable poll3 = queue.poll(0L);
            Assert.assertEquals("Queue size", 0L, queue.size());
            Assert.assertEquals("Queue content", "String1", poll3);
            Latch latch = new Latch();
            Thread thread = new Thread(() -> {
                try {
                    Queue queue2 = createQueueManager.getQueueSession().getQueue("queue1");
                    latch.release();
                    queue2.put("String1");
                } catch (Exception e) {
                    this.logger.warn("Error using queue session", e);
                }
            });
            thread.start();
            if (!latch.await(2000L, TimeUnit.MILLISECONDS)) {
                Assert.fail("Thread executing put over queue was not executed");
            }
            Serializable poll4 = queue.poll(5000L);
            thread.join(2000L);
            Assert.assertEquals("Queue size", queue.size(), 0L);
            Assert.assertEquals("Queue content", "String1", poll4);
            createQueueManager.stop();
        } catch (Throwable th) {
            createQueueManager.stop();
            throw th;
        }
    }

    @Test
    public void testPeek() throws Exception {
        AbstractQueueManager createQueueManager = createQueueManager();
        try {
            createQueueManager.start();
            Queue queue = createQueueManager.getQueueSession().getQueue("queue1");
            Assert.assertEquals("Queue size", 0L, queue.size());
            Serializable peek = queue.peek();
            Assert.assertEquals("Queue size", 0L, queue.size());
            Assert.assertNull(peek);
            queue.put("String1");
            Assert.assertEquals("Queue size", 1L, queue.size());
            Serializable peek2 = queue.peek();
            Assert.assertEquals("Queue size", 1L, queue.size());
            Assert.assertEquals("Queue content", "String1", peek2);
            Serializable poll = queue.poll(1000L);
            Assert.assertEquals("Queue size", 0L, queue.size());
            Assert.assertEquals("Queue content", "String1", poll);
            createQueueManager.stop();
        } catch (Throwable th) {
            createQueueManager.stop();
            throw th;
        }
    }

    @Test
    public void testOffer() throws Exception {
        AbstractQueueManager createQueueManager = createQueueManager();
        createQueueManager.setDefaultQueueConfiguration(new DefaultQueueConfiguration(1, false));
        try {
            createQueueManager.start();
            Queue queue = createQueueManager.getQueueSession().getQueue("queue1");
            Assert.assertThat("Queue size", Integer.valueOf(queue.size()), CoreMatchers.is(0));
            Assert.assertThat(Boolean.valueOf(queue.offer("String1", 0L)), CoreMatchers.is(true));
            Assert.assertThat("Queue size", Integer.valueOf(queue.size()), CoreMatchers.is(1));
            Assert.assertThat(Boolean.valueOf(queue.offer("String2", 1000L)), CoreMatchers.is(false));
            Assert.assertThat("Queue size", Integer.valueOf(queue.size()), CoreMatchers.is(1));
            Latch latch = new Latch();
            Thread thread = new Thread(() -> {
                try {
                    latch.release();
                    Assert.assertThat("Queue content", createQueueManager.getQueueSession().getQueue("queue1").take(), CoreMatchers.is("String1"));
                } catch (Exception e) {
                    this.logger.warn("Error using queue session", e);
                }
            });
            thread.start();
            if (!latch.await(2000L, TimeUnit.MILLISECONDS)) {
                Assert.fail("Thread executing put over queue was not executed");
            }
            Assert.assertThat(Boolean.valueOf(queue.offer("String2", 1000L)), CoreMatchers.is(true));
            thread.join(2000L);
            Assert.assertThat("Queue size", Integer.valueOf(queue.size()), CoreMatchers.is(1));
            createQueueManager.stop();
        } catch (Throwable th) {
            createQueueManager.stop();
            throw th;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v19, types: [byte[], java.io.Serializable] */
    @Test
    public void testRecoverWarmRestart() throws Exception {
        AbstractQueueManager createQueueManager = createQueueManager();
        createQueueManager.start();
        Queue queue = createQueueManager.getQueueSession().getQueue("warmRecoverQueue");
        Random random = new Random();
        for (int i = 0; i < 50; i++) {
            ?? r0 = new byte[2048];
            random.nextBytes(r0);
            queue.put((Serializable) r0);
        }
        Assert.assertEquals(queue.size(), 50);
        createQueueManager.stop();
        createQueueManager.start();
        Assert.assertEquals(50, queue.size());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v27, types: [byte[], java.io.Serializable] */
    @Test
    public void testRecoverColdRestart() throws Exception {
        AbstractQueueManager createQueueManager = createQueueManager();
        Queue queue = createQueueManager.getQueueSession().getQueue("warmRecoverQueue");
        createQueueManager.start();
        Random random = new Random();
        for (int i = 0; i < 50; i++) {
            ?? r0 = new byte[2048];
            random.nextBytes(r0);
            queue.put((Serializable) r0);
        }
        Assert.assertEquals(50, queue.size());
        createQueueManager.stop();
        AbstractQueueManager createQueueManager2 = createQueueManager();
        Queue queue2 = createQueueManager2.getQueueSession().getQueue("warmRecoverQueue");
        createQueueManager2.start();
        if (isPersistent()) {
            Assert.assertEquals(50, queue2.size());
        } else {
            Assert.assertEquals(0L, queue2.size());
        }
    }

    @Test
    public void testDisposeQueueWithoutTransaction() throws Exception {
        this.disposeTest.testDisposal(createQueueManager(), false);
    }

    @Test
    public void testDisposeQueueInTransaction() throws Exception {
        this.disposeTest.testDisposal(createQueueManager(), true);
    }

    @Test
    public void testDisposeQueueByNameInTransaction() throws Exception {
        this.disposeTest.testDisposal(createQueueManager(), true);
    }
}
