package org.mule.runtime.core.internal.routing.forkjoin;

import io.qameta.allure.Description;
import io.qameta.allure.Feature;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mule.runtime.api.component.AbstractComponent;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.error.Errors;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.util.func.CheckedConsumer;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.routing.ForkJoinStrategy;
import org.mule.runtime.core.privileged.processor.InternalProcessor;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.mule.runtime.core.privileged.routing.CompositeRoutingException;
import org.mule.runtime.core.privileged.routing.RoutingResult;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.mule.tck.testmodels.fruit.Apple;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Feature("Fork/Join Strategies used by scatter-gather and parallel-foreach routers")
/* loaded from: input_file:org/mule/runtime/core/internal/routing/forkjoin/AbstractForkJoinStrategyTestCase.class */
public abstract class AbstractForkJoinStrategyTestCase extends AbstractMuleContextTestCase {
    protected ForkJoinStrategy strategy;
    protected ProcessingStrategy processingStrategy;
    protected Scheduler actualScheduler;
    protected Scheduler scheduler;
    protected ErrorType timeoutErrorType;

    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private final List<MessageProcessorChain> chains = new ArrayList();

    @FunctionalInterface
    /* loaded from: input_file:org/mule/runtime/core/internal/routing/forkjoin/AbstractForkJoinStrategyTestCase$InternalTestProcessor.class */
    private interface InternalTestProcessor extends Processor, InternalProcessor {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/mule/runtime/core/internal/routing/forkjoin/AbstractForkJoinStrategyTestCase$SleepingProcessor.class */
    public static class SleepingProcessor extends AbstractComponent implements Processor {
        long sleep;
        Message result;

        public SleepingProcessor(Message message, long j) {
            this.result = message;
            this.sleep = j;
        }

        public CoreEvent process(CoreEvent coreEvent) throws MuleException {
            try {
                Thread.sleep(this.sleep);
                return CoreEvent.builder(coreEvent).message(this.result).build();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Before
    public void setup() {
        this.processingStrategy = (ProcessingStrategy) Mockito.mock(ProcessingStrategy.class);
        Mockito.when(this.processingStrategy.onPipeline((ReactiveProcessor) ArgumentMatchers.any(ReactiveProcessor.class))).thenAnswer(invocationOnMock -> {
            return invocationOnMock.getArgument(0);
        });
        this.actualScheduler = muleContext.getSchedulerService().ioScheduler();
        this.scheduler = (Scheduler) Mockito.spy(this.actualScheduler);
        this.timeoutErrorType = (ErrorType) muleContext.getErrorTypeRepository().getErrorType(Errors.ComponentIdentifiers.Handleable.TIMEOUT).get();
        setupConcurrentProcessingStrategy();
        this.strategy = createStrategy(this.processingStrategy, Integer.MAX_VALUE, true, 2147483647L);
    }

    @After
    public void tearDown() {
        this.actualScheduler.stop();
        if (this.chains.isEmpty()) {
            return;
        }
        LifecycleUtils.disposeIfNeeded(this.chains, LoggerFactory.getLogger(getClass()));
        this.chains.clear();
    }

    protected abstract ForkJoinStrategy createStrategy(ProcessingStrategy processingStrategy, int i, boolean z, long j);

    @Test
    @Description("When a route timeout occurs a CompositeRoutingException is thrown with details of timeout error in RoutingResult.")
    public void timeout() throws Throwable {
        this.strategy = createStrategy(this.processingStrategy, 1, true, 50L);
        this.expectedException.expect(CoreMatchers.instanceOf(CompositeRoutingException.class));
        invokeStrategyBlocking(this.strategy, testEvent(), Arrays.asList(createRoutingPairWithSleep(Message.of(1), 250L)), th -> {
            Assert.assertThat(((Error) assertRoutingResult(assertCompositeRoutingException(th, 1), 0, 1).getFailures().get("0")).getCause(), CoreMatchers.instanceOf(TimeoutException.class));
        });
    }

    @Test
    @Description("When a route timeout occurs all routes are still executed and a CompositeRoutingException is thrown with details of timeout error and successful routes in RoutingResult.")
    public void timeoutDelayed() throws Throwable {
        this.strategy = createStrategy(this.processingStrategy, 1, true, 50L);
        Processor createProcessorSpy = createProcessorSpy(Message.of(2));
        this.expectedException.expect(CoreMatchers.instanceOf(CompositeRoutingException.class));
        invokeStrategyBlocking(this.strategy, testEvent(), Arrays.asList(createRoutingPairWithSleep(Message.of(1), 250L), ForkJoinStrategy.RoutingPair.of(testEvent(), createChain(createProcessorSpy))), th -> {
            ((Processor) Mockito.verify(createProcessorSpy, Mockito.times(1))).process((CoreEvent) ArgumentMatchers.any(CoreEvent.class));
            Iterator it = assertRoutingResult(assertCompositeRoutingException(th, 1), 1, 1).getFailures().values().iterator();
            while (it.hasNext()) {
                Assert.assertThat(((Error) it.next()).getCause(), CoreMatchers.instanceOf(TimeoutException.class));
            }
        });
    }

    @Test
    @Description("When configured with delayErrors='false' the first timeout causes strategy to throw a TimeoutException.")
    public void timeoutEager() throws Throwable {
        this.strategy = createStrategy(this.processingStrategy, 1, false, 50L);
        Processor createProcessorSpy = createProcessorSpy(Message.of(2));
        ForkJoinStrategy.RoutingPair of = ForkJoinStrategy.RoutingPair.of(testEvent(), createChain(createProcessorSpy));
        this.expectedException.expect(CoreMatchers.instanceOf(DefaultMuleException.class));
        this.expectedException.expectCause(CoreMatchers.instanceOf(TimeoutException.class));
        invokeStrategyBlocking(this.strategy, testEvent(), Arrays.asList(createRoutingPairWithSleep(Message.of(1), 250L), of), th -> {
            ((Processor) Mockito.verify(createProcessorSpy, Mockito.never())).process((CoreEvent) ArgumentMatchers.any(CoreEvent.class));
        });
    }

    @Test
    @Description("Errors are thrown via CompositeRoutingException with RoutingResult containing details of failures.")
    public void error() throws Throwable {
        IllegalStateException illegalStateException = new IllegalStateException();
        ForkJoinStrategy.RoutingPair of = ForkJoinStrategy.RoutingPair.of(testEvent(), createFailingRoutingPair(illegalStateException));
        this.expectedException.expect(CoreMatchers.instanceOf(CompositeRoutingException.class));
        invokeStrategyBlocking(this.strategy, testEvent(), Arrays.asList(of), th -> {
            Assert.assertThat(((Error) assertRoutingResult(assertCompositeRoutingException(th, 1), 0, 1).getFailures().get("0")).getCause(), CoreMatchers.is(illegalStateException));
        });
    }

    @Test
    @Description("When an error occurs all routes are executed regardless and a CompositeRoutingException is thrown containing a RoutingResult with details of both failures and successes.")
    public void errorDelayed() throws Throwable {
        Processor createProcessorSpy = createProcessorSpy(testEvent().getMessage());
        IllegalStateException illegalStateException = new IllegalStateException();
        ForkJoinStrategy.RoutingPair of = ForkJoinStrategy.RoutingPair.of(testEvent(), createFailingRoutingPair(illegalStateException));
        UnsupportedOperationException unsupportedOperationException = new UnsupportedOperationException();
        ForkJoinStrategy.RoutingPair of2 = ForkJoinStrategy.RoutingPair.of(testEvent(), createFailingRoutingPair(unsupportedOperationException));
        IndexOutOfBoundsException indexOutOfBoundsException = new IndexOutOfBoundsException();
        ForkJoinStrategy.RoutingPair of3 = ForkJoinStrategy.RoutingPair.of(testEvent(), createFailingRoutingPair(indexOutOfBoundsException));
        ForkJoinStrategy.RoutingPair of4 = ForkJoinStrategy.RoutingPair.of(testEvent(), createChain(createProcessorSpy));
        this.expectedException.expect(CoreMatchers.instanceOf(CompositeRoutingException.class));
        invokeStrategyBlocking(this.strategy, testEvent(), Arrays.asList(of, of2, of3, of4), th -> {
            ((Processor) Mockito.verify(createProcessorSpy, Mockito.times(1))).process((CoreEvent) ArgumentMatchers.any(CoreEvent.class));
            RoutingResult assertRoutingResult = assertRoutingResult(assertCompositeRoutingException(th, 3), 1, 3);
            Assert.assertThat(((Error) assertRoutingResult.getFailures().get("0")).getCause(), CoreMatchers.is(illegalStateException));
            Assert.assertThat(((Error) assertRoutingResult.getFailures().get("1")).getCause(), CoreMatchers.is(unsupportedOperationException));
            Assert.assertThat(((Error) assertRoutingResult.getFailures().get("2")).getCause(), CoreMatchers.is(indexOutOfBoundsException));
            Assert.assertThat(assertRoutingResult.getFailures().get("3"), CoreMatchers.is(CoreMatchers.nullValue()));
        });
    }

    @Test
    @Description("When configured with delayErrors='false' the first errors causes strategy to throw this exception.")
    public void errorEager() throws Throwable {
        this.strategy = createStrategy(this.processingStrategy, 1, false, 2147483647L);
        Processor createProcessorSpy = createProcessorSpy(Message.of(1));
        IllegalStateException illegalStateException = new IllegalStateException();
        ForkJoinStrategy.RoutingPair of = ForkJoinStrategy.RoutingPair.of(testEvent(), createFailingRoutingPair(illegalStateException));
        ForkJoinStrategy.RoutingPair of2 = ForkJoinStrategy.RoutingPair.of(testEvent(), createChain(createProcessorSpy));
        this.expectedException.expect(CoreMatchers.instanceOf(MessagingException.class));
        this.expectedException.expectCause(CoreMatchers.is(illegalStateException));
        invokeStrategyBlocking(this.strategy, testEvent(), Arrays.asList(of, of2), th -> {
            ((Processor) Mockito.verify(createProcessorSpy, Mockito.never())).process((CoreEvent) ArgumentMatchers.any(CoreEvent.class));
        });
    }

    @Test
    @Description("When configured with delayErrors='false' the first errors causes strategy to throw this exception. Other routes may or may not be executed depending on concurrency.")
    public void errorEagerConcurrent() throws Throwable {
        this.strategy = createStrategy(this.processingStrategy, 4, false, 2147483647L);
        Processor createProcessorSpy = createProcessorSpy(Message.of(1));
        Processor createProcessorSpy2 = createProcessorSpy(Message.of(2));
        Processor createProcessorSpy3 = createProcessorSpy(Message.of(3));
        CoreEvent testEvent = testEvent();
        IllegalStateException illegalStateException = new IllegalStateException();
        ForkJoinStrategy.RoutingPair of = ForkJoinStrategy.RoutingPair.of(testEvent, createFailingRoutingPair(illegalStateException));
        ForkJoinStrategy.RoutingPair of2 = ForkJoinStrategy.RoutingPair.of(testEvent, createChain(createProcessorSpy));
        ForkJoinStrategy.RoutingPair of3 = ForkJoinStrategy.RoutingPair.of(testEvent, createChain(createProcessorSpy2));
        ForkJoinStrategy.RoutingPair of4 = ForkJoinStrategy.RoutingPair.of(testEvent, createChain(createProcessorSpy3));
        this.expectedException.expect(CoreMatchers.instanceOf(MessagingException.class));
        this.expectedException.expectCause(CoreMatchers.is(illegalStateException));
        invokeStrategyBlocking(this.strategy, testEvent(), Arrays.asList(of, of2, of3, of4), th -> {
            ((Processor) Mockito.verify(createProcessorSpy, Mockito.atMost(1))).process((CoreEvent) ArgumentMatchers.any(CoreEvent.class));
            ((Processor) Mockito.verify(createProcessorSpy2, Mockito.atMost(1))).process((CoreEvent) ArgumentMatchers.any(CoreEvent.class));
            ((Processor) Mockito.verify(createProcessorSpy3, Mockito.atMost(1))).process((CoreEvent) ArgumentMatchers.any(CoreEvent.class));
        });
    }

    @Test
    @Description("After successful completion of all routes the variables from each route are merged into the result.")
    public void flowVarsMerged() throws Throwable {
        Apple apple = new Apple();
        CoreEvent build = CoreEvent.builder(newEvent()).addVariable("before", "beforeValue").addVariable("before2", "before2Value").build();
        CoreEvent invokeStrategyBlocking = invokeStrategyBlocking(this.strategy, build, Arrays.asList(ForkJoinStrategy.RoutingPair.of(build, createChain(coreEvent -> {
            return CoreEvent.builder(coreEvent).addVariable("before2", "before2NewValue").addVariable("foo", "fooValue1").addVariable("foo2", "foo2Value1").addVariable("foo3", "foo3Value1").build();
        })), ForkJoinStrategy.RoutingPair.of(build, createChain(coreEvent2 -> {
            return CoreEvent.builder(coreEvent2).addVariable("foo2", "foo2Value2").addVariable("foo3", apple).build();
        }))));
        Assert.assertThat(invokeStrategyBlocking.getVariables().keySet(), Matchers.hasSize(5));
        Assert.assertThat(invokeStrategyBlocking.getVariables().keySet(), Matchers.hasItems(new String[]{"before", "before2", "foo", "foo", "foo2", "foo3"}));
        Assert.assertThat(((TypedValue) invokeStrategyBlocking.getVariables().get("before")).getValue(), CoreMatchers.equalTo("beforeValue"));
        Assert.assertThat(((TypedValue) invokeStrategyBlocking.getVariables().get("before2")).getValue(), CoreMatchers.equalTo("before2NewValue"));
        Assert.assertThat(((TypedValue) invokeStrategyBlocking.getVariables().get("foo")).getValue(), CoreMatchers.equalTo("fooValue1"));
        TypedValue typedValue = (TypedValue) invokeStrategyBlocking.getVariables().get("foo2");
        Assert.assertThat(typedValue.getDataType(), CoreMatchers.equalTo(DataType.builder().collectionType(List.class).itemType(String.class).build()));
        Assert.assertThat((List) typedValue.getValue(), Matchers.hasItems(new String[]{"foo2Value1", "foo2Value2"}));
        TypedValue typedValue2 = (TypedValue) invokeStrategyBlocking.getVariables().get("foo3");
        Assert.assertThat(typedValue2.getDataType(), CoreMatchers.equalTo(DataType.builder().collectionType(List.class).itemType(Object.class).build()));
        Assert.assertThat((List) typedValue2.getValue(), Matchers.hasItems(new Object[]{"foo3Value1", apple}));
    }

    @Test
    @Description("When the strategy uses a processing strategy that supports concurrent execution the total processing time is less that sequential processing.")
    public void concurrent() throws Throwable {
        invokeStrategyBlocking(this.strategy, testEvent(), createRoutingPairs(10, 50));
        ((Scheduler) Mockito.verify(this.scheduler, Mockito.times(10))).submit((Callable) ArgumentMatchers.any(Callable.class));
    }

    @Test
    @Description("When executing concurrently the strategy will throw a RejectedExceptionException if the scheduler being used throws a RejectedExceptionException.")
    public void concurrentRejectedExecution() throws Throwable {
        Mockito.when(this.scheduler.submit((Callable) ArgumentMatchers.any(Callable.class))).thenThrow(new Throwable[]{new RejectedExecutionException()});
        setupConcurrentProcessingStrategy();
        this.strategy = createStrategy(this.processingStrategy, 4, true, 2147483647L);
        this.expectedException.expect(MessagingException.class);
        this.expectedException.expectCause(CoreMatchers.instanceOf(RejectedExecutionException.class));
        invokeStrategyBlocking(this.strategy, testEvent(), createRoutingPairs(1));
    }

    @Test
    @Description("When concurrency is limited to '1' routes execute sequentially and the total processing time is the sum of processing each route, regardless of if the processing strategy supports concurrency.")
    public void sequential() throws Throwable {
        setupConcurrentProcessingStrategy();
        this.strategy = createStrategy(this.processingStrategy, 1, true, 2147483647L);
        invokeStrategyBlocking(this.strategy, testEvent(), createRoutingPairs(10, 50));
        ((Scheduler) Mockito.verify(this.scheduler, Mockito.never())).submit((Runnable) ArgumentMatchers.any(Runnable.class));
    }

    private void setupConcurrentProcessingStrategy() {
        Function function = reactiveProcessor -> {
            return publisher -> {
                return Mono.from(publisher).publishOn(Schedulers.fromExecutorService(this.scheduler)).transform(reactiveProcessor);
            };
        };
        Mockito.when(this.processingStrategy.onPipeline((ReactiveProcessor) ArgumentMatchers.any(ReactiveProcessor.class))).thenAnswer(invocationOnMock -> {
            return (ReactiveProcessor) function.apply(invocationOnMock.getArgument(0));
        });
    }

    private CompositeRoutingException assertCompositeRoutingException(Throwable th, int i) {
        Assert.assertThat(th, CoreMatchers.instanceOf(CompositeRoutingException.class));
        CompositeRoutingException compositeRoutingException = (CompositeRoutingException) th;
        Assert.assertThat(Integer.valueOf(compositeRoutingException.getErrors().size()), Matchers.greaterThanOrEqualTo(Integer.valueOf(i)));
        return compositeRoutingException;
    }

    private RoutingResult assertRoutingResult(CompositeRoutingException compositeRoutingException, int i, int i2) {
        Assert.assertThat(compositeRoutingException.getErrorMessage().getPayload().getValue(), CoreMatchers.instanceOf(RoutingResult.class));
        RoutingResult routingResult = (RoutingResult) compositeRoutingException.getErrorMessage().getPayload().getValue();
        Assert.assertThat(Integer.valueOf(routingResult.getResults().size()), Matchers.lessThanOrEqualTo(Integer.valueOf(i)));
        Assert.assertThat(Integer.valueOf(routingResult.getFailures().size()), Matchers.greaterThanOrEqualTo(Integer.valueOf(i2)));
        Assert.assertThat(Integer.valueOf(routingResult.getFailures().size() + routingResult.getResults().size()), CoreMatchers.is(Integer.valueOf(i + i2)));
        return routingResult;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CoreEvent invokeStrategyBlocking(ForkJoinStrategy forkJoinStrategy, CoreEvent coreEvent, List<ForkJoinStrategy.RoutingPair> list) throws Throwable {
        return invokeStrategyBlocking(forkJoinStrategy, coreEvent, list, th -> {
        });
    }

    protected CoreEvent invokeStrategyBlocking(ForkJoinStrategy forkJoinStrategy, CoreEvent coreEvent, List<ForkJoinStrategy.RoutingPair> list, CheckedConsumer<Throwable> checkedConsumer) throws Throwable {
        try {
            return (CoreEvent) Mono.from(forkJoinStrategy.forkJoin(coreEvent, Flux.fromIterable(list))).block();
        } catch (Throwable th) {
            MuleException rxExceptionToMuleException = Exceptions.rxExceptionToMuleException(th);
            checkedConsumer.accept(rxExceptionToMuleException);
            throw rxExceptionToMuleException;
        }
    }

    private MessageProcessorChain createFailingRoutingPair(RuntimeException runtimeException) throws MuleException {
        return createChain(coreEvent -> {
            throw runtimeException;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Processor createProcessorSpy(final Message message) throws MuleException {
        return (Processor) Mockito.spy(new InternalTestProcessor() { // from class: org.mule.runtime.core.internal.routing.forkjoin.AbstractForkJoinStrategyTestCase.1
            public CoreEvent process(CoreEvent coreEvent) throws MuleException {
                return CoreEvent.builder(coreEvent).message(message).build();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ForkJoinStrategy.RoutingPair createRoutingPair(Processor processor) throws MuleException {
        return ForkJoinStrategy.RoutingPair.of(testEvent(), createChain(processor));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ForkJoinStrategy.RoutingPair createRoutingPair(Message message) throws MuleException {
        return createRoutingPairWithSleep(message, 0L);
    }

    private ForkJoinStrategy.RoutingPair createRoutingPairWithSleep(Message message, long j) throws MuleException {
        return ForkJoinStrategy.RoutingPair.of(testEvent(), createChain(new SleepingProcessor(message, j)));
    }

    private List<ForkJoinStrategy.RoutingPair> createRoutingPairs(int i) {
        return createRoutingPairs(i, 0);
    }

    private List<ForkJoinStrategy.RoutingPair> createRoutingPairs(int i, int i2) {
        return (List) IntStream.range(0, i).mapToObj(i3 -> {
            try {
                return createRoutingPairWithSleep(Message.of(1), i2);
            } catch (MuleException e) {
                throw new RuntimeException((Throwable) e);
            }
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageProcessorChain createChain(Processor processor) throws MuleException {
        MessageProcessorChain newChain = MessageProcessors.newChain(Optional.empty(), new Processor[]{processor});
        this.chains.add(newChain);
        LifecycleUtils.initialiseIfNeeded(newChain, muleContext);
        return newChain;
    }
}
