package org.mule.runtime.core.internal.routing;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import io.qameta.allure.Story;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.management.DescriptorKey;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mule.runtime.api.component.location.Location;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.streaming.CursorProvider;
import org.mule.runtime.api.streaming.bytes.CursorStreamProvider;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.expression.ExpressionRuntimeException;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.routing.ForkJoinStrategy;
import org.mule.runtime.core.internal.routing.forkjoin.CollectListForkJoinStrategyFactory;
import org.mule.runtime.core.internal.streaming.CursorUtils;
import org.mule.runtime.core.internal.streaming.ManagedCursorProvider;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.mule.tck.SensingNullMessageProcessor;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.tck.processor.ContextPropagationChecker;
import org.mule.tck.util.MuleContextUtils;
import reactor.core.publisher.Flux;

@Story("Parallel For Each")
@Feature("Scope")
@RunWith(Parameterized.class)
/* loaded from: input_file:org/mule/runtime/core/internal/routing/ParallelForEachTestCase.class */
public class ParallelForEachTestCase extends AbstractMuleContextTestCase {

    @Rule
    public SystemProperty detailedCompositeRoutingExceptionLog;

    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private final ParallelForEach router = new ParallelForEach();
    private final ForkJoinStrategyFactory mockForkJoinStrategyFactory = (ForkJoinStrategyFactory) Mockito.mock(ForkJoinStrategyFactory.class);

    public ParallelForEachTestCase(boolean z) {
        this.detailedCompositeRoutingExceptionLog = new SystemProperty("mule.detailedCompositeRoutingExceptionLog", Boolean.toString(z));
    }

    @Parameterized.Parameters(name = "Detailed log: {0}")
    public static List<Object[]> parameters() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    protected Map<String, Object> getStartUpRegistryObjects() {
        Mockito.when(this.componentLocator.find(Location.builder().globalName("appleFlow").build())).thenReturn(Optional.of(Mockito.mock(Flow.class)));
        return Collections.singletonMap("_muleConfigurationComponentLocator", this.componentLocator);
    }

    @After
    public void tearDown() {
        this.router.dispose();
    }

    @Test
    @Description("RoutingPairs are created for each route configured. Each RoutingPair has the same input event.")
    public void routingPairs() throws Exception {
        CoreEvent createListEvent = createListEvent();
        MessageProcessorChain messageProcessorChain = (MessageProcessorChain) Mockito.mock(MessageProcessorChain.class);
        muleContext.getInjector().inject(this.router);
        this.router.setMessageProcessors(Collections.singletonList(messageProcessorChain));
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        List list = (List) Flux.from(this.router.getRoutingPairs(createListEvent)).collectList().block();
        Assert.assertThat(list, Matchers.hasSize(2));
        Assert.assertThat(((ForkJoinStrategy.RoutingPair) list.get(0)).getEvent().getMessage().getPayload().getValue(), CoreMatchers.equalTo(((List) createListEvent.getMessage().getPayload().getValue()).get(0)));
        Assert.assertThat(((ForkJoinStrategy.RoutingPair) list.get(1)).getEvent().getMessage().getPayload().getValue(), CoreMatchers.equalTo(((List) createListEvent.getMessage().getPayload().getValue()).get(1)));
    }

    @Test
    @Description("By default the router result populates the outgoing message payload.")
    public void defaultTarget() throws Exception {
        CoreEvent createListEvent = createListEvent();
        MessageProcessorChain newChain = MessageProcessors.newChain(Optional.empty(), new Processor[]{coreEvent -> {
            return coreEvent;
        }});
        newChain.setMuleContext(muleContext);
        this.router.setMessageProcessors(Collections.singletonList(newChain));
        muleContext.getInjector().inject(this.router);
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        CoreEvent process = this.router.process(createListEvent);
        Assert.assertThat(process.getMessage().getPayload().getValue(), CoreMatchers.instanceOf(List.class));
        Assert.assertThat((List) process.getMessage().getPayload().getValue(), Matchers.hasSize(2));
    }

    @Test
    @Description("When a custom target is configured the router result is set in a variable and the input event is output.")
    public void customTargetMessage() throws Exception {
        CoreEvent createListEvent = createListEvent();
        MessageProcessorChain newChain = MessageProcessors.newChain(Optional.empty(), new Processor[]{coreEvent -> {
            return coreEvent;
        }});
        newChain.setMuleContext(muleContext);
        this.router.setMessageProcessors(Collections.singletonList(newChain));
        this.router.setTarget("foo");
        this.router.setTargetValue("#[message]");
        muleContext.getInjector().inject(this.router);
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        CoreEvent process = this.router.process(createListEvent);
        Assert.assertThat(process.getMessage(), CoreMatchers.equalTo(createListEvent.getMessage()));
        Assert.assertThat(((Message) ((TypedValue) process.getVariables().get("foo")).getValue()).getPayload().getValue(), CoreMatchers.instanceOf(List.class));
        Assert.assertThat((List) ((Message) ((TypedValue) process.getVariables().get("foo")).getValue()).getPayload().getValue(), Matchers.hasSize(2));
    }

    @Test
    @Description("When a custom target is configured the router result is set in a variable and the input event is output.")
    public void customTargetDefaultPayload() throws Exception {
        CoreEvent createListEvent = createListEvent();
        MessageProcessorChain newChain = MessageProcessors.newChain(Optional.empty(), new Processor[]{coreEvent -> {
            return coreEvent;
        }});
        newChain.setMuleContext(muleContext);
        this.router.setMessageProcessors(Collections.singletonList(newChain));
        this.router.setTarget("foo");
        muleContext.getInjector().inject(this.router);
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        CoreEvent process = this.router.process(createListEvent);
        Assert.assertThat(process.getMessage(), CoreMatchers.equalTo(createListEvent.getMessage()));
        TypedValue typedValue = (TypedValue) process.getVariables().get("foo");
        Assert.assertThat(typedValue.getValue(), CoreMatchers.instanceOf(List.class));
        Assert.assertThat(Boolean.valueOf(List.class.isAssignableFrom(typedValue.getDataType().getType())), CoreMatchers.is(true));
        Assert.assertThat((List) typedValue.getValue(), Matchers.hasSize(2));
    }

    @Test
    @Description("Cursor provider should be managed by cursor manager inside parallel-foreach.")
    @Issue("MULE-18573")
    public void cursorStreamShouldBeManagedByCursorManager() throws Exception {
        CursorProvider cursorProvider = (CursorProvider) Mockito.mock(CursorStreamProvider.class);
        CoreEvent build = getEventBuilder().message(Message.of(Collections.singletonList(cursorProvider))).build();
        MessageProcessorChain newChain = MessageProcessors.newChain(Optional.empty(), new Processor[]{coreEvent -> {
            return coreEvent;
        }});
        newChain.setMuleContext(muleContext);
        this.router.setMessageProcessors(Collections.singletonList(newChain));
        muleContext.getInjector().inject(this.router);
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        CoreEvent process = this.router.process(build);
        Assert.assertThat(process.getMessage().getPayload().getValue(), CoreMatchers.instanceOf(List.class));
        List list = (List) process.getMessage().getPayload().getValue();
        Assert.assertThat(list, Matchers.hasSize(1));
        Assert.assertThat(((Message) list.get(0)).getPayload().getValue(), CoreMatchers.is(CoreMatchers.instanceOf(ManagedCursorProvider.class)));
        Assert.assertThat(CursorUtils.unwrap((ManagedCursorProvider) ((Message) list.get(0)).getPayload().getValue()), CoreMatchers.is(CoreMatchers.sameInstance(cursorProvider)));
    }

    @Test
    @Description("The router uses a fork-join strategy with concurrency and timeout configured via the router and delayErrors true.")
    public void forkJoinStrategyConfiguration() throws Exception {
        this.router.setMaxConcurrency(3);
        this.router.setTimeout(123L);
        this.router.setMessageProcessors(Collections.singletonList(Mockito.mock(MessageProcessorChain.class)));
        this.router.setForkJoinStrategyFactory(this.mockForkJoinStrategyFactory);
        muleContext.getInjector().inject(this.router);
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        ((ForkJoinStrategyFactory) Mockito.verify(this.mockForkJoinStrategyFactory)).createForkJoinStrategy((ProcessingStrategy) ArgumentMatchers.any(ProcessingStrategy.class), ArgumentMatchers.eq(3), ArgumentMatchers.eq(true), ArgumentMatchers.eq(123L), (Scheduler) ArgumentMatchers.any(Scheduler.class), (ErrorType) ArgumentMatchers.any(ErrorType.class), ArgumentMatchers.eq(Boolean.parseBoolean(this.detailedCompositeRoutingExceptionLog.getValue())));
    }

    @Test
    @Description("By default CollectListForkJoinStrategyFactory is used which aggregates routes into a message with a List<Message> payload.")
    public void defaultForkJoinStrategyFactory() {
        Assert.assertThat(this.router.getDefaultForkJoinStrategyFactory(), CoreMatchers.instanceOf(CollectListForkJoinStrategyFactory.class));
        Assert.assertThat(this.router.getDefaultForkJoinStrategyFactory().getResultDataType(), CoreMatchers.equalTo(DataType.MULE_MESSAGE_LIST));
    }

    @Test
    @Description("Delay errors is always true for scatter-gather currently.")
    public void defaultDelayErrors() {
        Assert.assertThat(Boolean.valueOf(this.router.isDelayErrors()), CoreMatchers.equalTo(true));
    }

    @Test
    @DescriptorKey("An invalid collection expression result in a ExpressionRuntimeException")
    public void failingExpression() throws Exception {
        this.router.setMessageProcessors(Collections.singletonList(new SensingNullMessageProcessor()));
        this.router.setCollectionExpression("!@INVALID");
        muleContext.getInjector().inject(this.router);
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        this.expectedException.expect(MessagingException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(ExpressionRuntimeException.class));
        this.router.process(testEvent());
    }

    @Test
    public void subscriberContextPropagation() throws MuleException {
        ContextPropagationChecker contextPropagationChecker = new ContextPropagationChecker();
        this.router.setMessageProcessors(Collections.singletonList(contextPropagationChecker));
        muleContext.getInjector().inject(this.router);
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        ContextPropagationChecker.assertContextPropagation(MuleContextUtils.eventBuilder(muleContext).message(Message.of(Arrays.asList("1", "2", "3"))).build(), this.router, contextPropagationChecker);
    }

    private CoreEvent createListEvent() throws MuleException {
        ArrayList arrayList = new ArrayList();
        arrayList.add("bar");
        arrayList.add("zip");
        return getEventBuilder().message(Message.of(arrayList)).build();
    }
}
