package org.mule.runtime.core.internal.routing;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import io.qameta.allure.Story;
import java.io.StringBufferInputStream;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mule.runtime.api.component.ConfigurationProperties;
import org.mule.runtime.api.component.location.Location;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.routing.ForkJoinStrategy;
import org.mule.runtime.core.internal.routing.forkjoin.CollectMapForkJoinStrategyFactory;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.mule.tck.junit4.rule.SystemProperty;
import org.mule.tck.processor.ContextPropagationChecker;
import reactor.core.publisher.Flux;

@Story("Scatter Gather")
@Feature("Routers")
@RunWith(Parameterized.class)
/* loaded from: input_file:org/mule/runtime/core/internal/routing/ScatterGatherRouterTestCase.class */
public class ScatterGatherRouterTestCase extends AbstractMuleContextTestCase {

    @Rule
    public SystemProperty detailedCompositeRoutingExceptionLog;

    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private final ScatterGatherRouter router = new ScatterGatherRouter();
    private final ForkJoinStrategyFactory mockForkJoinStrategyFactory = (ForkJoinStrategyFactory) Mockito.mock(ForkJoinStrategyFactory.class);
    private final ConfigurationProperties configurationProperties = (ConfigurationProperties) Mockito.mock(ConfigurationProperties.class);

    public ScatterGatherRouterTestCase(boolean z) {
        this.detailedCompositeRoutingExceptionLog = new SystemProperty("mule.detailedCompositeRoutingExceptionLog", Boolean.toString(z));
    }

    @Parameterized.Parameters(name = "Detailed log: {0}")
    public static List<Object[]> parameters() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    protected Map<String, Object> getStartUpRegistryObjects() {
        Mockito.when(this.componentLocator.find(Location.builder().globalName("appleFlow").build())).thenReturn(Optional.of(Mockito.mock(Flow.class)));
        HashMap hashMap = new HashMap();
        hashMap.put("_muleConfigurationComponentLocator", this.componentLocator);
        hashMap.put("_configurationProperties", this.configurationProperties);
        return hashMap;
    }

    @After
    public void tearDown() throws Exception {
        this.router.dispose();
    }

    @Test
    @Description("RoutingPairs are created for each route configured. Each RoutingPair has the same input event.")
    public void routingPairs() throws Exception {
        CoreEvent coreEvent = (CoreEvent) Mockito.mock(CoreEvent.class);
        MessageProcessorChain messageProcessorChain = (MessageProcessorChain) Mockito.mock(MessageProcessorChain.class);
        MessageProcessorChain messageProcessorChain2 = (MessageProcessorChain) Mockito.mock(MessageProcessorChain.class);
        MessageProcessorChain messageProcessorChain3 = (MessageProcessorChain) Mockito.mock(MessageProcessorChain.class);
        muleContext.getInjector().inject(this.router);
        this.router.setRoutes(Arrays.asList(messageProcessorChain, messageProcessorChain2, messageProcessorChain3));
        List list = (List) Flux.from(this.router.getRoutingPairs(coreEvent)).collectList().block();
        Assert.assertThat(list, Matchers.hasSize(3));
        Assert.assertThat(list.get(0), CoreMatchers.equalTo(ForkJoinStrategy.RoutingPair.of(coreEvent, messageProcessorChain)));
        Assert.assertThat(list.get(1), CoreMatchers.equalTo(ForkJoinStrategy.RoutingPair.of(coreEvent, messageProcessorChain2)));
        Assert.assertThat(list.get(2), CoreMatchers.equalTo(ForkJoinStrategy.RoutingPair.of(coreEvent, messageProcessorChain3)));
    }

    @Test
    @Description("By default the router result populates the outgoing message payload.")
    public void defaultTarget() throws Exception {
        CoreEvent testEvent = testEvent();
        this.router.setRoutes(Arrays.asList(MessageProcessors.newChain(Optional.empty(), new Processor[]{coreEvent -> {
            return coreEvent;
        }}), MessageProcessors.newChain(Optional.empty(), new Processor[]{coreEvent2 -> {
            return coreEvent2;
        }})));
        muleContext.getInjector().inject(this.router);
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        CoreEvent process = this.router.process(testEvent);
        Assert.assertThat(process.getMessage().getPayload().getValue(), CoreMatchers.instanceOf(Map.class));
        Assert.assertThat(((Map) process.getMessage().getPayload().getValue()).values(), Matchers.hasSize(2));
    }

    @Test
    @Description("When a custom target is configured the router result is set in a variable and the input event is output.")
    public void customTargetMessage() throws Exception {
        CoreEvent testEvent = testEvent();
        MessageProcessorChain newChain = MessageProcessors.newChain(Optional.empty(), new Processor[]{coreEvent -> {
            return coreEvent;
        }});
        MessageProcessorChain newChain2 = MessageProcessors.newChain(Optional.empty(), new Processor[]{coreEvent2 -> {
            return coreEvent2;
        }});
        muleContext.getInjector().inject(this.router);
        this.router.setRoutes(Arrays.asList(newChain, newChain2));
        this.router.setTarget("foo");
        this.router.setTargetValue("#[message]");
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        CoreEvent process = this.router.process(testEvent);
        Assert.assertThat(process.getMessage(), CoreMatchers.equalTo(testEvent.getMessage()));
        Assert.assertThat(((Message) ((TypedValue) process.getVariables().get("foo")).getValue()).getPayload().getValue(), CoreMatchers.instanceOf(Map.class));
        Assert.assertThat(((Map) ((Message) ((TypedValue) process.getVariables().get("foo")).getValue()).getPayload().getValue()).values(), Matchers.hasSize(2));
    }

    @Test
    @Description("When a custom target is configured the router result is set in a variable and the input event is output.")
    public void customTargetDefaultPayload() throws Exception {
        CoreEvent testEvent = testEvent();
        this.router.setRoutes(Arrays.asList(MessageProcessors.newChain(Optional.empty(), new Processor[]{coreEvent -> {
            return coreEvent;
        }}), MessageProcessors.newChain(Optional.empty(), new Processor[]{coreEvent2 -> {
            return coreEvent2;
        }})));
        this.router.setTarget("foo");
        muleContext.getInjector().inject(this.router);
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        CoreEvent process = this.router.process(testEvent);
        Assert.assertThat(process.getMessage(), CoreMatchers.equalTo(testEvent.getMessage()));
        TypedValue typedValue = (TypedValue) process.getVariables().get("foo");
        Assert.assertThat(typedValue.getValue(), CoreMatchers.instanceOf(Map.class));
        Assert.assertThat(Boolean.valueOf(Map.class.isAssignableFrom(typedValue.getDataType().getType())), CoreMatchers.is(true));
        Assert.assertThat(((Map) typedValue.getValue()).values(), Matchers.hasSize(2));
    }

    @Test
    @Description("The router uses a fork-join strategy with concurrency and timeout configured via the router and delayErrors true.")
    public void forkJoinStrategyConfiguration() throws Exception {
        this.router.setMaxConcurrency(3);
        this.router.setTimeout(123L);
        this.router.setRoutes((List) IntStream.range(0, 21).mapToObj(i -> {
            return (MessageProcessorChain) Mockito.mock(MessageProcessorChain.class);
        }).collect(Collectors.toList()));
        this.router.setForkJoinStrategyFactory(this.mockForkJoinStrategyFactory);
        muleContext.getInjector().inject(this.router);
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        ((ForkJoinStrategyFactory) Mockito.verify(this.mockForkJoinStrategyFactory)).createForkJoinStrategy((ProcessingStrategy) ArgumentMatchers.any(ProcessingStrategy.class), ArgumentMatchers.eq(3), ArgumentMatchers.eq(true), ArgumentMatchers.eq(123L), (Scheduler) ArgumentMatchers.any(Scheduler.class), (ErrorType) ArgumentMatchers.any(ErrorType.class), ArgumentMatchers.eq(Boolean.parseBoolean(this.detailedCompositeRoutingExceptionLog.getValue())));
    }

    @Test
    @Description("By default CollectMapForkJoinStrategyFactory is used which aggregates routes into a message with a Map<Message> payload.")
    public void defaultForkJoinStrategyFactory() {
        Assert.assertThat(this.router.getDefaultForkJoinStrategyFactory(), CoreMatchers.instanceOf(CollectMapForkJoinStrategyFactory.class));
        Assert.assertThat(this.router.getDefaultForkJoinStrategyFactory().getResultDataType(), CoreMatchers.equalTo(DataType.MULE_MESSAGE_MAP));
    }

    @Test
    @Description("Consumable payloads are not supported.")
    public void consumablePayload() throws Exception {
        MessageProcessorChain newChain = MessageProcessors.newChain(Optional.empty(), new Processor[]{coreEvent -> {
            return coreEvent;
        }});
        MessageProcessorChain newChain2 = MessageProcessors.newChain(Optional.empty(), new Processor[]{coreEvent2 -> {
            return coreEvent2;
        }});
        muleContext.getInjector().inject(this.router);
        this.router.setRoutes(Arrays.asList(newChain, newChain2));
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        this.expectedException.expect(CoreMatchers.instanceOf(MuleRuntimeException.class));
        this.router.process(CoreEvent.builder(testEvent()).message(Message.of(new StringBufferInputStream("test"))).build());
    }

    @Test
    @Description("Delay errors is always true for scatter-gather currently.")
    public void defaultDelayErrors() {
        Assert.assertThat(Boolean.valueOf(this.router.isDelayErrors()), CoreMatchers.equalTo(true));
    }

    @Test
    public void subscriberContextPropagation() throws MuleException {
        Processor contextPropagationChecker = new ContextPropagationChecker();
        Processor contextPropagationChecker2 = new ContextPropagationChecker();
        muleContext.getInjector().inject(this.router);
        this.router.setRoutes(Arrays.asList(MessageProcessors.newChain(Optional.empty(), new Processor[]{contextPropagationChecker}), MessageProcessors.newChain(Optional.empty(), new Processor[]{contextPropagationChecker2})));
        this.router.setAnnotations(getAppleFlowComponentLocationAnnotations());
        this.router.initialise();
        ContextPropagationChecker.assertContextPropagation(testEvent(), this.router, contextPropagationChecker);
        ContextPropagationChecker.assertContextPropagation(testEvent(), this.router, contextPropagationChecker2);
    }
}
