package org.mule.runtime.core.internal.processor;

import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Answers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mule.runtime.api.component.AbstractComponent;
import org.mule.runtime.api.el.BindingContext;
import org.mule.runtime.api.el.CompiledExpression;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.serialization.ObjectSerializer;
import org.mule.runtime.api.store.ObjectStore;
import org.mule.runtime.api.store.ObjectStoreException;
import org.mule.runtime.api.store.ObjectStoreManager;
import org.mule.runtime.api.store.ObjectStoreSettings;
import org.mule.runtime.api.store.TemplateObjectStore;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.el.ExpressionManager;
import org.mule.runtime.core.api.el.ExpressionManagerSession;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.expression.ExpressionRuntimeException;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.lock.MuleLockFactory;
import org.mule.runtime.core.internal.lock.SingleServerLockProvider;
import org.mule.runtime.core.internal.message.InternalMessage;
import org.mule.runtime.core.internal.processor.IdempotentRedeliveryPolicy;
import org.mule.tck.SerializationTestUtils;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.reactivestreams.Publisher;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

@Story("Redelivery")
@Feature("Sources")
/* loaded from: input_file:org/mule/runtime/core/internal/processor/IdempotentRedeliveryPolicyTestCase.class */
public class IdempotentRedeliveryPolicyTestCase extends AbstractMuleContextTestCase {
    public static final String STRING_MESSAGE = "message";
    public static final int MAX_REDELIVERY_COUNT = 5;
    private static ObjectSerializer serializer;
    private CoreEvent event;
    private ExpressionManager expressionManager;
    private final ObjectStoreManager mockObjectStoreManager = (ObjectStoreManager) Mockito.mock(ObjectStoreManager.class, Answers.RETURNS_DEEP_STUBS.get());
    private final Processor mockFailingMessageProcessor = (Processor) Mockito.mock(Processor.class, Answers.RETURNS_DEEP_STUBS.get());
    private final Processor mockWaitingMessageProcessor = (Processor) Mockito.mock(Processor.class, Answers.RETURNS_DEEP_STUBS.get());
    private final InternalMessage message = (InternalMessage) Mockito.mock(InternalMessage.class, Answers.RETURNS_DEEP_STUBS.get());
    private final Latch waitLatch = new Latch();
    private final CountDownLatch waitingMessageProcessorExecutionLatch = new CountDownLatch(2);
    private final IdempotentRedeliveryPolicy irp = new IdempotentRedeliveryPolicy();
    private final AtomicInteger count = new AtomicInteger();
    private final ObjectStore mockObjectStore = (ObjectStore) Mockito.mock(ObjectStore.class);

    @Rule
    public ExpectedException expectedException = ExpectedException.none();

    /* loaded from: input_file:org/mule/runtime/core/internal/processor/IdempotentRedeliveryPolicyTestCase$ExecuteIrpThread.class */
    public class ExecuteIrpThread extends Thread {
        public Exception exception;

        public ExecuteIrpThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                IdempotentRedeliveryPolicyTestCase.this.irp.process(IdempotentRedeliveryPolicyTestCase.this.event);
            } catch (Exception e) {
                this.exception = e;
            }
        }
    }

    /* loaded from: input_file:org/mule/runtime/core/internal/processor/IdempotentRedeliveryPolicyTestCase$InMemoryObjectStore.class */
    public static class InMemoryObjectStore extends TemplateObjectStore<IdempotentRedeliveryPolicy.RedeliveryCounter> {
        private final Map<String, IdempotentRedeliveryPolicy.RedeliveryCounter> store = new HashMap();

        protected boolean doContains(String str) throws ObjectStoreException {
            return this.store.containsKey(str);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void doStore(String str, IdempotentRedeliveryPolicy.RedeliveryCounter redeliveryCounter) throws ObjectStoreException {
            this.store.put(str, redeliveryCounter);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doRetrieve, reason: merged with bridge method [inline-methods] */
        public IdempotentRedeliveryPolicy.RedeliveryCounter m1doRetrieve(String str) throws ObjectStoreException {
            return this.store.get(str);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doRemove, reason: merged with bridge method [inline-methods] */
        public IdempotentRedeliveryPolicy.RedeliveryCounter m0doRemove(String str) throws ObjectStoreException {
            return this.store.remove(str);
        }

        public void clear() throws ObjectStoreException {
            this.store.clear();
        }

        public boolean isPersistent() {
            return false;
        }

        public void open() throws ObjectStoreException {
        }

        public void close() throws ObjectStoreException {
        }

        public List<String> allKeys() throws ObjectStoreException {
            return new ArrayList(this.store.keySet());
        }

        public Map<String, IdempotentRedeliveryPolicy.RedeliveryCounter> retrieveAll() throws ObjectStoreException {
            return Collections.unmodifiableMap(this.store);
        }
    }

    /* loaded from: input_file:org/mule/runtime/core/internal/processor/IdempotentRedeliveryPolicyTestCase$SerializationObjectStore.class */
    public static class SerializationObjectStore extends TemplateObjectStore<IdempotentRedeliveryPolicy.RedeliveryCounter> {
        private final Map<String, Serializable> store = new HashMap();

        protected boolean doContains(String str) throws ObjectStoreException {
            return this.store.containsKey(str);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void doStore(String str, IdempotentRedeliveryPolicy.RedeliveryCounter redeliveryCounter) throws ObjectStoreException {
            this.store.put(str, IdempotentRedeliveryPolicyTestCase.serializer.getExternalProtocol().serialize(redeliveryCounter));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doRetrieve, reason: merged with bridge method [inline-methods] */
        public IdempotentRedeliveryPolicy.RedeliveryCounter m3doRetrieve(String str) throws ObjectStoreException {
            return (IdempotentRedeliveryPolicy.RedeliveryCounter) IdempotentRedeliveryPolicyTestCase.serializer.getExternalProtocol().deserialize((byte[]) ((Serializable) this.store.get(str)));
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: doRemove, reason: merged with bridge method [inline-methods] */
        public IdempotentRedeliveryPolicy.RedeliveryCounter m2doRemove(String str) throws ObjectStoreException {
            return (IdempotentRedeliveryPolicy.RedeliveryCounter) IdempotentRedeliveryPolicyTestCase.serializer.getExternalProtocol().deserialize((byte[]) ((Serializable) this.store.remove(str)));
        }

        public boolean isPersistent() {
            return false;
        }

        public void clear() throws ObjectStoreException {
            this.store.clear();
        }

        public void open() throws ObjectStoreException {
        }

        public void close() throws ObjectStoreException {
        }

        public List<String> allKeys() throws ObjectStoreException {
            return new ArrayList(this.store.keySet());
        }

        public Map<String, IdempotentRedeliveryPolicy.RedeliveryCounter> retrieveAll() throws ObjectStoreException {
            return (Map) this.store.entrySet().stream().collect(Collectors.toMap(entry -> {
                return (String) entry.getKey();
            }, entry2 -> {
                return (IdempotentRedeliveryPolicy.RedeliveryCounter) IdempotentRedeliveryPolicyTestCase.serializer.getExternalProtocol().deserialize((byte[]) entry2.getValue());
            }));
        }
    }

    protected Map<String, Object> getStartUpRegistryObjects() {
        return Collections.singletonMap("_muleConfigurationComponentLocator", this.componentLocator);
    }

    @Before
    public void setUpTest() throws MuleException {
        this.event = (CoreEvent) Mockito.spy(testEvent());
        this.expressionManager = (ExpressionManager) Mockito.spy(muleContext.getExpressionManager());
        Mockito.when(this.mockFailingMessageProcessor.apply((Publisher) ArgumentMatchers.any(Publisher.class))).thenAnswer(invocationOnMock -> {
            MessagingException messagingException = (MessagingException) Mockito.mock(MessagingException.class, Answers.RETURNS_DEEP_STUBS.get());
            CoreEvent coreEvent = (CoreEvent) Mockito.mock(CoreEvent.class);
            Mockito.when(coreEvent.getError()).thenReturn(Optional.of(Mockito.mock(Error.class)));
            Mockito.when(messagingException.getEvent()).thenReturn(coreEvent);
            return Mono.error(messagingException).doOnError(th -> {
                this.count.getAndIncrement();
            });
        });
        Mockito.when(this.mockWaitingMessageProcessor.apply((Publisher) ArgumentMatchers.any(Publisher.class))).thenAnswer(invocationOnMock2 -> {
            return Mono.from((Publisher) invocationOnMock2.getArgument(0)).doOnNext(Exceptions.checkedConsumer(coreEvent -> {
                this.waitingMessageProcessorExecutionLatch.countDown();
                this.waitLatch.await(2000L, TimeUnit.MILLISECONDS);
            })).transform(this.mockFailingMessageProcessor);
        });
        MuleLockFactory muleLockFactory = new MuleLockFactory();
        muleLockFactory.setLockProvider(new SingleServerLockProvider());
        muleLockFactory.initialise();
        InMemoryObjectStore inMemoryObjectStore = new InMemoryObjectStore();
        Mockito.when(this.mockObjectStoreManager.getObjectStore(ArgumentMatchers.anyString())).thenReturn(inMemoryObjectStore);
        Mockito.when(this.mockObjectStoreManager.createObjectStore((String) ArgumentMatchers.any(), (ObjectStoreSettings) ArgumentMatchers.any())).thenReturn(inMemoryObjectStore);
        Mockito.when(this.event.getMessage()).thenReturn(this.message);
        serializer = SerializationTestUtils.getJavaSerializerWithMockContext();
        muleContext.getInjector().inject(this.irp);
        this.irp.setExpressionManager(this.expressionManager);
        this.irp.setMaxRedeliveryCount(5);
        this.irp.setUseSecureHash(true);
        this.irp.setMuleContext(muleContext);
        this.irp.setAnnotations(Collections.singletonMap(AbstractComponent.LOCATION_KEY, TEST_CONNECTOR_LOCATION));
        this.irp.setMessageProcessors(Collections.singletonList(this.mockFailingMessageProcessor));
        this.irp.setLockFactory(muleLockFactory);
        this.irp.setObjectStoreManager(this.mockObjectStoreManager);
    }

    @After
    public void after() {
        LifecycleUtils.disposeIfNeeded(this.irp, LoggerFactory.getLogger(getClass()));
    }

    @Test(expected = ExpressionRuntimeException.class)
    public void messageDigestFailure() throws Exception {
        Mockito.when(this.expressionManager.openSession((BindingContext) ArgumentMatchers.any())).thenThrow(new Throwable[]{new ExpressionRuntimeException(I18nMessageFactory.createStaticMessage("mock"))});
        Mockito.when(this.message.getPayload()).thenReturn(new TypedValue(new Object(), DataType.OBJECT));
        this.irp.initialise();
        this.irp.process(this.event);
    }

    @Test
    public void testMessageRedeliveryUsingMemory() throws Exception {
        mockSha256();
        Mockito.when(this.message.getPayload()).thenReturn(new TypedValue(STRING_MESSAGE, DataType.STRING));
        this.irp.initialise();
        processUntilFailure();
        Assert.assertThat(Integer.valueOf(this.count.get()), CoreMatchers.equalTo(6));
    }

    @Test
    public void testMessageRedeliveryUsingSerializationStore() throws Exception {
        mockSha256();
        Mockito.when(this.message.getPayload()).thenReturn(new TypedValue(STRING_MESSAGE, DataType.STRING));
        Mockito.reset(new ObjectStoreManager[]{this.mockObjectStoreManager});
        Mockito.when(this.mockObjectStoreManager.createObjectStore((String) ArgumentMatchers.any(), (ObjectStoreSettings) ArgumentMatchers.any())).thenReturn(new SerializationObjectStore());
        this.irp.initialise();
        processUntilFailure();
        Assert.assertThat(Integer.valueOf(this.count.get()), CoreMatchers.equalTo(6));
    }

    @Test
    public void testThreadSafeObjectStoreUsage() throws Exception {
        mockSha256();
        Mockito.when(this.message.getPayload()).thenReturn(new TypedValue(STRING_MESSAGE, DataType.STRING));
        this.irp.setMessageProcessors(Collections.singletonList(this.mockWaitingMessageProcessor));
        this.irp.initialise();
        ExecuteIrpThread executeIrpThread = new ExecuteIrpThread();
        executeIrpThread.start();
        ExecuteIrpThread executeIrpThread2 = new ExecuteIrpThread();
        executeIrpThread2.start();
        this.waitingMessageProcessorExecutionLatch.await(5000L, TimeUnit.MILLISECONDS);
        this.waitLatch.release();
        executeIrpThread.join();
        executeIrpThread2.join();
        Assert.assertThat(Integer.valueOf(this.count.get()), CoreMatchers.equalTo(2));
    }

    @Test
    public void multipleObjectStoreConfigurationShouldRaiseException() throws Exception {
        this.irp.setObjectStore(this.mockObjectStore);
        this.irp.setPrivateObjectStore(this.mockObjectStore);
        this.expectedException.expect(InitialisationException.class);
        this.irp.initialise();
    }

    @Test
    public void objectStoreIsClosed() throws Exception {
        this.irp.setObjectStore(this.mockObjectStore);
        this.irp.dispose();
        ((ObjectStore) Mockito.verify(this.mockObjectStore)).close();
    }

    @Test
    public void javaObject() throws MuleException {
        Object mock = Mockito.mock(Object.class);
        this.event = (CoreEvent) Mockito.spy(CoreEvent.builder(testEvent()).addVariable("hash", Integer.valueOf(mock.hashCode())).build());
        this.irp.setIdExpression("#[vars.hash]");
        this.irp.initialise();
        Mockito.when(this.message.getPayload()).thenReturn(new TypedValue(mock, DataType.OBJECT));
        processUntilFailure();
        Mockito.verify(mock).hashCode();
    }

    @Test
    public void objectStoreIsRemovedWhenDisposed() throws Exception {
        this.irp.setObjectStore(this.mockObjectStore);
        this.irp.dispose();
        ((ObjectStoreManager) Mockito.verify(this.mockObjectStoreManager)).disposeStore(TEST_CONNECTOR_LOCATION.getRootContainerName() + "." + IdempotentRedeliveryPolicy.class.getName());
    }

    private void processUntilFailure() {
        for (int i = 0; i < 7; i++) {
            try {
                this.irp.process(this.event);
            } catch (Exception e) {
            }
        }
    }

    private void mockSha256() {
        ExpressionManager expressionManager = (ExpressionManager) Mockito.mock(ExpressionManager.class);
        ExpressionManagerSession expressionManagerSession = (ExpressionManagerSession) Mockito.mock(ExpressionManagerSession.class);
        CompiledExpression compiledExpression = (CompiledExpression) Mockito.mock(CompiledExpression.class);
        Mockito.when(expressionManager.openSession((BindingContext) ArgumentMatchers.any())).thenReturn(expressionManagerSession);
        Mockito.when(expressionManager.compile((String) ArgumentMatchers.eq(String.format(IdempotentRedeliveryPolicy.SECURE_HASH_EXPR_FORMAT, "SHA-256")), (BindingContext) ArgumentMatchers.any())).thenReturn(compiledExpression);
        this.irp.setExpressionManager(expressionManager);
        Mockito.when(expressionManagerSession.evaluate(compiledExpression, DataType.STRING)).thenAnswer(invocationOnMock -> {
            return new TypedValue("" + this.event.getMessage().getPayload().hashCode(), DataType.STRING);
        });
    }
}
