/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.module.extension.internal.runtime;

import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.MockSettings;
import org.mockito.Mockito;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.event.EventContext;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.meta.model.ComponentModel;
import org.mule.runtime.api.meta.model.EnrichableModel;
import org.mule.runtime.api.meta.model.ExtensionModel;
import org.mule.runtime.api.meta.model.XmlDslModel;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.event.EventContextFactory;
import org.mule.runtime.core.api.expression.ExpressionRuntimeException;
import org.mule.runtime.core.api.extension.ExtensionManager;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.internal.message.InternalEvent;
import org.mule.runtime.core.internal.metadata.cache.MetadataCacheIdGeneratorFactory;
import org.mule.runtime.core.internal.policy.DefaultPolicyManager;
import org.mule.runtime.core.internal.policy.OperationParametersProcessor;
import org.mule.runtime.core.internal.policy.PolicyManager;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.mule.runtime.extension.api.runtime.config.ConfigurationProvider;
import org.mule.runtime.module.extension.api.loader.java.property.CompletableComponentExecutorModelProperty;
import org.mule.runtime.module.extension.internal.runtime.IdentityExecutor;
import org.mule.runtime.module.extension.internal.runtime.InfiniteEmitter;
import org.mule.runtime.module.extension.internal.runtime.ItemsConsumer;
import org.mule.runtime.module.extension.internal.runtime.TestComponentMessageProcessor;
import org.mule.runtime.module.extension.internal.runtime.operation.ComponentMessageProcessor;
import org.mule.runtime.module.extension.internal.runtime.resolver.ResolverSet;
import org.mule.runtime.module.extension.internal.runtime.resolver.ResolverSetResult;
import org.mule.runtime.module.extension.internal.runtime.resolver.ValueResolvingContext;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Feature(value="Execution Engine")
@RunWith(value=Parameterized.class)
public class ComponentMessageProcessorTestCase
extends AbstractMuleContextTestCase {
    private static final Logger LOGGER = LoggerFactory.getLogger(ComponentMessageProcessorTestCase.class);
    protected ComponentMessageProcessor<ComponentModel> processor;
    protected ExtensionModel extensionModel;
    protected ComponentModel componentModel;
    protected ResolverSet resolverSet;
    protected ExtensionManager extensionManager;
    protected PolicyManager mockPolicyManager;
    private Flow testFlow;
    private final boolean isWithinProcessToApply;

    @Parameterized.Parameters(name="Is within process to apply: {0}")
    public static List<Object[]> parameters() {
        return Arrays.asList({false}, {true});
    }

    public ComponentMessageProcessorTestCase(boolean isWithinProcessToApply) {
        this.isWithinProcessToApply = isWithinProcessToApply;
    }

    @Before
    public void before() throws MuleException {
        this.extensionModel = (ExtensionModel)Mockito.mock(ExtensionModel.class);
        Mockito.when((Object)this.extensionModel.getXmlDslModel()).thenReturn((Object)XmlDslModel.builder().setPrefix("mock").build());
        this.componentModel = (ComponentModel)Mockito.mock(ComponentModel.class, (MockSettings)Mockito.withSettings().extraInterfaces(new Class[]{EnrichableModel.class}));
        Mockito.when((Object)this.componentModel.getModelProperty(CompletableComponentExecutorModelProperty.class)).thenReturn(Optional.of(new CompletableComponentExecutorModelProperty(IdentityExecutor::create)));
        this.resolverSet = (ResolverSet)Mockito.mock(ResolverSet.class);
        this.extensionManager = (ExtensionManager)Mockito.mock(ExtensionManager.class);
        this.mockPolicyManager = (PolicyManager)Mockito.mock(PolicyManager.class);
        Mockito.when((Object)this.mockPolicyManager.createOperationPolicy((Component)ArgumentMatchers.any(), (CoreEvent)ArgumentMatchers.any(), (OperationParametersProcessor)ArgumentMatchers.any())).thenReturn((Object)DefaultPolicyManager.noPolicyOperation());
        this.testFlow = ComponentMessageProcessorTestCase.getTestFlow((MuleContext)muleContext);
        LifecycleUtils.initialiseIfNeeded((Object)this.testFlow, (MuleContext)muleContext);
        this.processor = this.createProcessor();
        this.processor.setAnnotations(ComponentMessageProcessorTestCase.getAppleFlowComponentLocationAnnotations());
        this.processor.setComponentLocator(this.componentLocator);
        this.processor.setCacheIdGeneratorFactory((MetadataCacheIdGeneratorFactory)Mockito.mock(MetadataCacheIdGeneratorFactory.class));
        LifecycleUtils.initialiseIfNeeded(this.processor, (MuleContext)muleContext);
        LifecycleUtils.startIfNeeded(this.processor);
    }

    @After
    public void after() throws MuleException {
        LifecycleUtils.stopIfNeeded(this.processor);
        LifecycleUtils.disposeIfNeeded(this.processor, (Logger)LOGGER);
        LifecycleUtils.stopIfNeeded((Object)this.testFlow);
        LifecycleUtils.disposeIfNeeded((Object)this.testFlow, (Logger)LOGGER);
    }

    protected ComponentMessageProcessor<ComponentModel> createProcessor() {
        return new TestComponentMessageProcessor(this.extensionModel, this.componentModel, null, null, null, this.resolverSet, null, null, null, null, this.extensionManager, this.mockPolicyManager, null, null, muleContext.getConfiguration().getShutdownTimeout()){

            protected void validateOperationConfiguration(ConfigurationProvider configurationProvider) {
            }

            public ReactiveProcessor.ProcessingType getInnerProcessingType() {
                return ReactiveProcessor.ProcessingType.CPU_LITE;
            }
        };
    }

    protected CoreEvent.Builder getEventBuilder() {
        return InternalEvent.builder((EventContext)EventContextFactory.create((FlowConstruct)this.testFlow, (ComponentLocation)TEST_CONNECTOR_LOCATION));
    }

    @Test
    public void happyPath() throws MuleException {
        ResolverSetResult resolverSetResult = (ResolverSetResult)Mockito.mock(ResolverSetResult.class);
        Mockito.when((Object)this.resolverSet.resolve((ValueResolvingContext)ArgumentMatchers.any(ValueResolvingContext.class))).thenReturn((Object)resolverSetResult);
        Assert.assertNotNull((Object)Mono.from((Publisher)this.processor.apply((Publisher)Mono.just((Object)this.testEvent()))).subscriberContext(ctx -> ctx.put((Object)"messageProcessors.withinProcessToApply", (Object)this.isWithinProcessToApply)).block());
    }

    @Test
    public void muleRuntimeExceptionInResolutionResult() throws MuleException {
        ExpressionRuntimeException thrown = new ExpressionRuntimeException(I18nMessageFactory.createStaticMessage((String)"Expected"));
        Mockito.when((Object)this.resolverSet.resolve((ValueResolvingContext)ArgumentMatchers.any(ValueResolvingContext.class))).thenThrow(new Throwable[]{thrown});
        this.assertMessagingExceptionCausedBy((Exception)thrown, this.isWithinProcessToApply);
    }

    @Test
    public void muleExceptionInResolutionResult() throws MuleException {
        DefaultMuleException thrown = new DefaultMuleException(I18nMessageFactory.createStaticMessage((String)"Expected"));
        Mockito.when((Object)this.resolverSet.resolve((ValueResolvingContext)ArgumentMatchers.any(ValueResolvingContext.class))).thenThrow(new Throwable[]{thrown});
        this.assertMessagingExceptionCausedBy((Exception)thrown, this.isWithinProcessToApply);
    }

    @Test
    public void runtimeExceptionInResolutionResult() throws MuleException {
        NullPointerException thrown = new NullPointerException("Expected");
        Mockito.when((Object)this.resolverSet.resolve((ValueResolvingContext)ArgumentMatchers.any(ValueResolvingContext.class))).thenThrow(new Throwable[]{thrown});
        this.assertMessagingExceptionCausedBy(thrown, this.isWithinProcessToApply);
    }

    private void assertMessagingExceptionCausedBy(final Exception thrown, boolean isWithinProcessToApply) throws MuleException {
        final ArrayList processingErrors = new ArrayList();
        FluxSinkRecorder emitter = new FluxSinkRecorder();
        emitter.next((Object)this.testEvent());
        emitter.complete();
        Flux processorFlux = Flux.create((Consumer)emitter).transform(objectFlux -> this.processor.apply((Publisher)objectFlux)).onErrorContinue((throwable, o) -> processingErrors.add(throwable));
        BaseSubscriber<CoreEvent> assertingSubscriber = new BaseSubscriber<CoreEvent>(){

            protected void hookOnComplete() {
                super.hookOnComplete();
                MatcherAssert.assertThat((String)"Only one error should be returned has part of the event processing", (Object)processingErrors, (Matcher)Matchers.hasSize((int)1));
                MatcherAssert.assertThat((String)"Error should be wrapper in a MessagingException", (Object)processingErrors, (Matcher)Matchers.contains((Matcher)Matchers.is((Matcher)Matchers.instanceOf(MessagingException.class))));
                MatcherAssert.assertThat((String)"Error is not the expected one", (Object)processingErrors, (Matcher)Matchers.contains((Matcher)Matchers.hasProperty((String)"cause", (Matcher)Matchers.equalTo((Object)thrown))));
            }
        };
        processorFlux.subscriberContext(ctx -> ctx.put((Object)"messageProcessors.withinProcessToApply", (Object)isWithinProcessToApply)).subscribeWith((Subscriber)assertingSubscriber);
    }

    @Test
    public void happyPathFluxPublisher() throws MuleException, InterruptedException {
        ResolverSetResult resolverSetResult = (ResolverSetResult)Mockito.mock(ResolverSetResult.class);
        Mockito.when((Object)this.resolverSet.resolve((ValueResolvingContext)ArgumentMatchers.any(ValueResolvingContext.class))).thenReturn((Object)resolverSetResult);
        this.subscribeToParallelPublisherAndAwait(3);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void multipleUpstreamPublishers() throws MuleException, InterruptedException {
        ResolverSetResult resolverSetResult = (ResolverSetResult)Mockito.mock(ResolverSetResult.class);
        Mockito.when((Object)this.resolverSet.resolve((ValueResolvingContext)ArgumentMatchers.any(ValueResolvingContext.class))).thenReturn((Object)resolverSetResult);
        InfiniteEmitter<CoreEvent> eventsEmitter = new InfiniteEmitter<CoreEvent>(() -> ((ComponentMessageProcessorTestCase)this).newEvent());
        InfiniteEmitter<CoreEvent> eventsEmitter2 = new InfiniteEmitter<CoreEvent>(() -> ((ComponentMessageProcessorTestCase)this).newEvent());
        ItemsConsumer eventsConsumer = new ItemsConsumer(10);
        ItemsConsumer eventsConsumer2 = new ItemsConsumer(3);
        Flux.create(eventsEmitter).transform(this.processor).subscriberContext(ctx -> ctx.put((Object)"messageProcessors.withinProcessToApply", (Object)this.isWithinProcessToApply)).subscribe(eventsConsumer);
        Flux.create(eventsEmitter2).transform(this.processor).subscriberContext(ctx -> ctx.put((Object)"messageProcessors.withinProcessToApply", (Object)this.isWithinProcessToApply)).subscribe(eventsConsumer2);
        eventsEmitter.start();
        eventsEmitter2.start();
        try {
            MatcherAssert.assertThat((Object)eventsConsumer.await(5000L, TimeUnit.MILLISECONDS), (Matcher)Matchers.is((Object)true));
            MatcherAssert.assertThat((Object)eventsConsumer2.await(5000L, TimeUnit.MILLISECONDS), (Matcher)Matchers.is((Object)true));
        }
        finally {
            eventsEmitter.stop();
            eventsEmitter2.stop();
        }
    }

    @Test
    @Issue(value="W-13563214")
    public void newSubscriptionAfterPreviousPublisherTermination() throws MuleException, InterruptedException {
        ResolverSetResult resolverSetResult = (ResolverSetResult)Mockito.mock(ResolverSetResult.class);
        Mockito.when((Object)this.resolverSet.resolve((ValueResolvingContext)ArgumentMatchers.any(ValueResolvingContext.class))).thenReturn((Object)resolverSetResult);
        this.subscribeToParallelPublisherAndAwait(5);
        this.subscribeToParallelPublisherAndAwait(4);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void subscribeToParallelPublisherAndAwait(int numEvents) throws InterruptedException {
        InfiniteEmitter<CoreEvent> eventsEmitter = new InfiniteEmitter<CoreEvent>(() -> ((ComponentMessageProcessorTestCase)this).newEvent());
        ItemsConsumer eventsConsumer = new ItemsConsumer(numEvents);
        Flux.create(eventsEmitter).transform(this.processor).doOnNext(Assert::assertNotNull).subscriberContext(ctx -> ctx.put((Object)"messageProcessors.withinProcessToApply", (Object)this.isWithinProcessToApply)).subscribe(eventsConsumer);
        eventsEmitter.start();
        try {
            MatcherAssert.assertThat((Object)eventsConsumer.await(5000L, TimeUnit.MILLISECONDS), (Matcher)Matchers.is((Object)true));
        }
        finally {
            eventsEmitter.stop();
        }
    }
}

