package org.mule.runtime.core.internal.routing;

import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import java.io.ByteArrayInputStream;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.collection.IsMapContaining;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.expression.ExpressionRuntimeException;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.retry.policy.RetryPolicyExhaustedException;
import org.mule.runtime.core.api.transaction.Transaction;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.message.InternalEvent;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.mule.runtime.core.privileged.processor.InternalProcessor;
import org.mule.tck.MuleTestUtils;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.mule.tck.processor.ContextPropagationChecker;
import org.mule.tck.util.MuleContextUtils;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

@Story("Until Successful")
@Feature("Scope")
@RunWith(Parameterized.class)
/* loaded from: input_file:org/mule/runtime/core/internal/routing/UntilSuccessfulTestCase.class */
public class UntilSuccessfulTestCase extends AbstractMuleContextTestCase {
    private static final String MILLIS_BETWEEN_RETRIES = "100";
    private static final String RETRY_CTX_INTERNAL_PARAMETER_KEY = "untilSuccessful.router.retryContext";

    @Rule
    public ExpectedException expected = ExpectedException.none();
    private Flow flow;
    private UntilSuccessful untilSuccessful;
    private ConfigurableMessageProcessor targetMessageProcessor;
    private final boolean tx;

    /* loaded from: input_file:org/mule/runtime/core/internal/routing/UntilSuccessfulTestCase$ConfigurableMessageProcessor.class */
    public static class ConfigurableMessageProcessor implements Processor, InternalProcessor {
        private volatile int eventCount;
        private volatile CoreEvent event;
        private volatile int numberOfFailuresToSimulate;

        public CoreEvent process(CoreEvent coreEvent) throws MuleException {
            this.eventCount++;
            int i = this.numberOfFailuresToSimulate;
            this.numberOfFailuresToSimulate = i - 1;
            if (i > 0) {
                throw new RuntimeException("simulated problem");
            }
            this.event = coreEvent;
            return coreEvent;
        }

        public CoreEvent getEventReceived() {
            return this.event;
        }

        public int getEventCount() {
            return this.eventCount;
        }

        public void setNumberOfFailuresToSimulate(int i) {
            this.numberOfFailuresToSimulate = i;
        }
    }

    @Parameterized.Parameters(name = "tx: {0}")
    public static Collection<Boolean> modeParameters() {
        return Arrays.asList(Boolean.TRUE, Boolean.FALSE);
    }

    public UntilSuccessfulTestCase(boolean z) {
        this.tx = z;
    }

    protected Map<String, Object> getStartUpRegistryObjects() {
        return Collections.singletonMap("_muleConfigurationComponentLocator", this.componentLocator);
    }

    protected void doSetUp() throws Exception {
        super.doSetUp();
        this.flow = MuleTestUtils.createAndRegisterFlow(muleContext, "appleFlow", this.componentLocator);
        this.untilSuccessful = buildUntilSuccessful(MILLIS_BETWEEN_RETRIES);
        if (this.tx) {
            TransactionCoordination.getInstance().bindTransaction((Transaction) Mockito.mock(Transaction.class));
        }
    }

    @After
    public void doTeardown() throws Exception {
        this.untilSuccessful.dispose();
        super.doTearDown();
    }

    private UntilSuccessful buildUntilSuccessful(String str) throws Exception {
        this.targetMessageProcessor = new ConfigurableMessageProcessor();
        return buildUntilSuccessfulWithProcessors(str, "2", this.targetMessageProcessor);
    }

    private UntilSuccessful buildUntilSuccessfulWithProcessors(String str, String str2, Processor... processorArr) throws Exception {
        UntilSuccessful untilSuccessful = new UntilSuccessful();
        untilSuccessful.setMaxRetries(str2);
        untilSuccessful.setAnnotations(getAppleFlowComponentLocationAnnotations());
        if (str != null) {
            untilSuccessful.setMillisBetweenRetries(str);
        }
        untilSuccessful.setMessageProcessors(Arrays.asList(processorArr));
        muleContext.getInjector().inject(untilSuccessful);
        return untilSuccessful;
    }

    private UntilSuccessful buildNestedUntilSuccessful() throws Exception {
        UntilSuccessful untilSuccessful = new UntilSuccessful();
        untilSuccessful.setMaxRetries("1");
        untilSuccessful.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.targetMessageProcessor = new ConfigurableMessageProcessor();
        untilSuccessful.setMessageProcessors(Collections.singletonList(buildUntilSuccessfulWithProcessors(MILLIS_BETWEEN_RETRIES, "1", this.targetMessageProcessor)));
        muleContext.getInjector().inject(untilSuccessful);
        return untilSuccessful;
    }

    protected void doTearDown() throws Exception {
        this.untilSuccessful.stop();
    }

    @Test
    public void testSuccessfulDelivery() throws Exception {
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        assertLogicallyEqualEvents(testEvent(), this.untilSuccessful.process(testEvent()));
        assertTargetEventReceived(testEvent());
    }

    @Test
    public void testSuccessfulDeliveryStreamPayload() throws Exception {
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        CoreEvent build = MuleContextUtils.eventBuilder(muleContext).message(Message.of(new ByteArrayInputStream("test_data".getBytes()))).build();
        Assert.assertSame(build.getMessage(), this.untilSuccessful.process(build).getMessage());
        assertTargetEventReceived(build);
    }

    @Test
    public void testPermanentDeliveryFailure() throws Exception {
        this.targetMessageProcessor.setNumberOfFailuresToSimulate(Integer.MAX_VALUE);
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        CoreEvent build = MuleContextUtils.eventBuilder(muleContext).message(Message.of("ERROR")).build();
        this.expected.expect(MessagingException.class);
        this.expected.expectCause(CoreMatchers.instanceOf(RetryPolicyExhaustedException.class));
        try {
            this.untilSuccessful.process(build);
            Assert.assertEquals(1 + Integer.parseInt(this.untilSuccessful.getMaxRetries()), this.targetMessageProcessor.getEventCount());
        } catch (Throwable th) {
            Assert.assertEquals(1 + Integer.parseInt(this.untilSuccessful.getMaxRetries()), this.targetMessageProcessor.getEventCount());
            throw th;
        }
    }

    @Test
    public void testTemporaryDeliveryFailure() throws Exception {
        this.targetMessageProcessor.setNumberOfFailuresToSimulate(Integer.parseInt(this.untilSuccessful.getMaxRetries()));
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        CoreEvent build = MuleContextUtils.eventBuilder(muleContext).message(Message.of("ERROR")).build();
        Assert.assertSame(build.getMessage(), this.untilSuccessful.process(build).getMessage());
        assertTargetEventReceived(build);
        Assert.assertEquals(this.targetMessageProcessor.getEventCount(), Integer.parseInt(this.untilSuccessful.getMaxRetries()) + 1);
    }

    @Test
    public void testProcessingStrategyUsage() throws Exception {
        this.targetMessageProcessor.setNumberOfFailuresToSimulate(Integer.parseInt(this.untilSuccessful.getMaxRetries()));
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        this.untilSuccessful.process(MuleContextUtils.eventBuilder(muleContext).message(Message.of("ERROR")).build()).getMessage();
        ((ProcessingStrategy) Mockito.verify(this.flow.getProcessingStrategy(), Mockito.never())).onPipeline((ReactiveProcessor) ArgumentMatchers.any(ReactiveProcessor.class));
    }

    @Test
    public void testDefaultMillisWait() throws Exception {
        this.untilSuccessful = buildUntilSuccessful(null);
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        Assert.assertEquals(60000L, Integer.parseInt(this.untilSuccessful.getMillisBetweenRetries()));
    }

    @Test
    public void testWithExpressionRetries() throws Exception {
        this.targetMessageProcessor.setNumberOfFailuresToSimulate(4);
        this.untilSuccessful.setMaxRetries("#[2 + 2]");
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        CoreEvent build = MuleContextUtils.eventBuilder(muleContext).message(Message.of("ERROR")).build();
        Assert.assertSame(build.getMessage(), this.untilSuccessful.process(build).getMessage());
        assertTargetEventReceived(build);
        Assert.assertEquals(this.targetMessageProcessor.getEventCount(), 5L);
    }

    @Test
    public void testWithExpressionRetriesMultipleExecutions() throws Exception {
        this.targetMessageProcessor.setNumberOfFailuresToSimulate(2);
        this.untilSuccessful.setMaxRetries("#[payload + 2]");
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        CoreEvent build = MuleContextUtils.eventBuilder(muleContext).message(Message.of(4)).build();
        Assert.assertSame(build.getMessage(), this.untilSuccessful.process(build).getMessage());
        Assert.assertSame(build.getMessage(), this.untilSuccessful.process(build).getMessage());
        Assert.assertEquals(this.targetMessageProcessor.getEventCount(), 4L);
    }

    @Test
    public void testWithExpressionRetriesUsingPayload() throws Exception {
        this.targetMessageProcessor.setNumberOfFailuresToSimulate(10);
        this.untilSuccessful.setMaxRetries("#[payload + 2]");
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        CoreEvent build = MuleContextUtils.eventBuilder(muleContext).message(Message.of(1)).build();
        this.expected.expect(MessagingException.class);
        this.expected.expectCause(CoreMatchers.instanceOf(RetryPolicyExhaustedException.class));
        try {
            this.untilSuccessful.process(build);
        } finally {
            Assert.assertEquals(this.targetMessageProcessor.getEventCount(), 4L);
        }
    }

    @Test
    public void testWithWrongExpressionRetry() throws Exception {
        this.targetMessageProcessor.setNumberOfFailuresToSimulate(10);
        this.untilSuccessful.setMaxRetries("#[payload + 2]");
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        CoreEvent build = MuleContextUtils.eventBuilder(muleContext).message(Message.of("queso")).build();
        this.expected.expectCause(CoreMatchers.instanceOf(ExpressionRuntimeException.class));
        this.expected.expectMessage(CoreMatchers.containsString("You called the function '+' with these arguments"));
        this.untilSuccessful.process(build);
    }

    @Test
    public void testRetryContextIsClearedAfterSuccessfulScopeExecution() throws Exception {
        this.targetMessageProcessor.setNumberOfFailuresToSimulate(1);
        this.untilSuccessful.setMaxRetries("1");
        this.untilSuccessful.setMillisBetweenRetries(MILLIS_BETWEEN_RETRIES);
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        InternalEvent process = this.untilSuccessful.process(MuleContextUtils.eventBuilder(muleContext).message(Message.of("queso")).build());
        MatcherAssert.assertThat(getPayloadAsString(process.getMessage()), Matchers.is("queso"));
        MatcherAssert.assertThat(Boolean.valueOf(((Map) process.getInternalParameter(RETRY_CTX_INTERNAL_PARAMETER_KEY)).isEmpty()), Matchers.is(true));
    }

    @Test
    public void testRetryContextIsClearedAfterNestedSuccessfulScopeExecution() throws Exception {
        this.untilSuccessful = buildNestedUntilSuccessful();
        this.targetMessageProcessor.setNumberOfFailuresToSimulate(1);
        this.untilSuccessful.setMaxRetries("1");
        this.untilSuccessful.setMillisBetweenRetries(MILLIS_BETWEEN_RETRIES);
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        InternalEvent process = this.untilSuccessful.process(MuleContextUtils.eventBuilder(muleContext).message(Message.of("queso")).build());
        MatcherAssert.assertThat(getPayloadAsString(process.getMessage()), Matchers.is("queso"));
        MatcherAssert.assertThat(Boolean.valueOf(((Map) process.getInternalParameter(RETRY_CTX_INTERNAL_PARAMETER_KEY)).isEmpty()), Matchers.is(true));
    }

    @Test
    public void testRetryContextIsClearedAfterExhaustedScopeExecution() throws Exception {
        this.targetMessageProcessor.setNumberOfFailuresToSimulate(2);
        this.untilSuccessful.setMaxRetries("1");
        this.untilSuccessful.setMillisBetweenRetries(MILLIS_BETWEEN_RETRIES);
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        assertNoRetryContextAfterScopeExecutions(2);
    }

    @Test
    public void testRetryContextIsClearedAfterNestedExhaustedScopeExecution() throws Exception {
        this.untilSuccessful = buildNestedUntilSuccessful();
        this.targetMessageProcessor.setNumberOfFailuresToSimulate(4);
        this.untilSuccessful.setMaxRetries("1");
        this.untilSuccessful.setMillisBetweenRetries(MILLIS_BETWEEN_RETRIES);
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        assertNoRetryContextAfterScopeExecutions(4);
    }

    @Test
    public void subscriberContextPropagation() throws MuleException {
        ContextPropagationChecker contextPropagationChecker = new ContextPropagationChecker();
        this.untilSuccessful = new UntilSuccessful();
        this.untilSuccessful.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.untilSuccessful.setMessageProcessors(Collections.singletonList(contextPropagationChecker));
        muleContext.getInjector().inject(this.untilSuccessful);
        this.untilSuccessful.initialise();
        this.untilSuccessful.start();
        ContextPropagationChecker.assertContextPropagation(MuleContextUtils.eventBuilder(muleContext).message(Message.of("1")).build(), this.untilSuccessful, contextPropagationChecker);
    }

    @Test
    public void routerFluxesLifecycle() throws MuleException {
        this.untilSuccessful.initialise();
        FluxSinkRecorder fluxSinkRecorder = new FluxSinkRecorder();
        ProcessingStrategy processingStrategy = (ProcessingStrategy) Mockito.mock(ProcessingStrategy.class);
        AtomicReference atomicReference = new AtomicReference();
        Mockito.when(processingStrategy.configureInternalPublisher((Publisher) ArgumentMatchers.any())).thenAnswer(invocationOnMock -> {
            Flux flux = (Flux) invocationOnMock.getArgument(0);
            atomicReference.getClass();
            return flux.doOnNext((v1) -> {
                r1.set(v1);
            });
        });
        UntilSuccessfulRouter untilSuccessfulRouter = new UntilSuccessfulRouter(this.flow, fluxSinkRecorder.flux(), coreEvent -> {
            return coreEvent;
        }, processingStrategy, muleContext.getExpressionManager(), (Predicate) null, (Scheduler) null, "1", MILLIS_BETWEEN_RETRIES, true);
        ((ProcessingStrategy) Mockito.verify(processingStrategy)).configureInternalPublisher((Publisher) ArgumentMatchers.any());
        Flux from = Flux.from(untilSuccessfulRouter.getDownstreamPublisher());
        Runnable runnable = (Runnable) Mockito.mock(Runnable.class);
        from.subscribe((Consumer) null, (Consumer) null, runnable);
        fluxSinkRecorder.next(testEvent());
        MatcherAssert.assertThat("Event peeked in the innerFlux does not have the retry context.", ((InternalEvent) atomicReference.get()).getInternalParameters(), IsMapContaining.hasKey(RETRY_CTX_INTERNAL_PARAMETER_KEY));
        fluxSinkRecorder.complete();
        ((Runnable) Mockito.verify(runnable)).run();
    }

    protected void assertNoRetryContextAfterScopeExecutions(int i) throws MuleException {
        try {
            this.untilSuccessful.process(MuleContextUtils.eventBuilder(muleContext).message(Message.of("queso")).build());
            Assert.fail("An exhaustion error was expected from an until successful scope");
        } catch (Exception e) {
            MatcherAssert.assertThat(Boolean.valueOf(((Map) e.getEvent().getInternalParameter(RETRY_CTX_INTERNAL_PARAMETER_KEY)).isEmpty()), Matchers.is(true));
            MatcherAssert.assertThat(Integer.valueOf(this.targetMessageProcessor.eventCount), Matchers.is(Integer.valueOf(i)));
        }
    }

    private void assertTargetEventReceived(CoreEvent coreEvent) throws MuleException {
        MatcherAssert.assertThat(this.targetMessageProcessor.getEventReceived(), CoreMatchers.not(CoreMatchers.nullValue()));
        assertLogicallyEqualEvents(coreEvent, this.targetMessageProcessor.getEventReceived());
    }

    private void assertLogicallyEqualEvents(CoreEvent coreEvent, CoreEvent coreEvent2) throws MuleException {
        Assert.assertEquals(coreEvent.getCorrelationId(), coreEvent2.getCorrelationId());
        Assert.assertEquals(coreEvent.getMessage(), coreEvent2.getMessage());
    }
}
