package org.mule.runtime.core.internal.routing;

import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import io.qameta.allure.Story;
import java.nio.BufferOverflowException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.hamcrest.BaseMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.ItemSequenceInfo;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.streaming.CursorProvider;
import org.mule.runtime.api.streaming.bytes.CursorStreamProvider;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.expression.ExpressionRuntimeException;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.message.InternalEvent;
import org.mule.runtime.core.internal.message.InternalMessage;
import org.mule.runtime.core.internal.streaming.CursorUtils;
import org.mule.runtime.core.internal.streaming.ManagedCursorProvider;
import org.mule.runtime.core.internal.util.StringHashCodeCollisionGenerator;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.processor.InternalProcessor;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.mule.tck.SensingNullMessageProcessor;
import org.mule.tck.junit4.AbstractReactiveProcessorTestCase;
import org.mule.tck.junit4.matcher.DataTypeCompatibilityMatcher;
import org.mule.tck.processor.ContextPropagationChecker;
import org.mule.tck.testmodels.mule.TestMessageProcessor;
import org.mule.tck.util.MuleContextUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Story("Foreach")
@Feature("Scope")
/* loaded from: input_file:org/mule/runtime/core/internal/routing/ForeachTestCase.class */
public class ForeachTestCase extends AbstractReactiveProcessorTestCase {
    private static final Logger LOGGER = LoggerFactory.getLogger(ForeachTestCase.class);
    private static final String ERR_NUMBER_MESSAGES = "Not a correct number of messages processed";
    private static final String ERR_PAYLOAD_TYPE = "Type error on processed payloads";
    private static final String ERR_OUTPUT = "Messages processed incorrectly";
    private static final String ERR_INVALID_ITEM_SEQUENCE = "Null ItemSequence received";
    private static final String ERR_SEQUENCE_OVERRIDDEN = "Sequence should't be overridden after foreach";
    private static final int CONCURRENCY = 1000;
    private static final int CONCURRENCY_TIMEOUT_SECONDS = 20;
    private Foreach foreach;
    private Foreach chainedForeach;
    private Foreach simpleForeach;
    private Foreach nestedForeach;
    private List<CoreEvent> processedEvents;
    private Map<String, TypedValue<?>> variables;
    private ExecutorService executorService;

    @Rule
    public ExpectedException expectedException;

    /* loaded from: input_file:org/mule/runtime/core/internal/routing/ForeachTestCase$DummyNestedIterableClass.class */
    private class DummyNestedIterableClass implements Iterable<DummySimpleIterableClass> {
        private final List<DummySimpleIterableClass> iterables = new ArrayList();

        DummyNestedIterableClass() {
            DummySimpleIterableClass dummySimpleIterableClass = new DummySimpleIterableClass();
            dummySimpleIterableClass.strings = new ArrayList();
            dummySimpleIterableClass.strings.add("a1");
            dummySimpleIterableClass.strings.add("a2");
            DummySimpleIterableClass dummySimpleIterableClass2 = new DummySimpleIterableClass();
            dummySimpleIterableClass2.strings = new ArrayList();
            dummySimpleIterableClass2.strings.add("a3");
            dummySimpleIterableClass2.strings.add("b1");
            dummySimpleIterableClass2.strings.add("b2");
            dummySimpleIterableClass2.strings.add("c1");
            this.iterables.add(dummySimpleIterableClass);
            this.iterables.add(dummySimpleIterableClass2);
        }

        @Override // java.lang.Iterable
        public Iterator<DummySimpleIterableClass> iterator() {
            return this.iterables.iterator();
        }
    }

    /* loaded from: input_file:org/mule/runtime/core/internal/routing/ForeachTestCase$DummySimpleIterableClass.class */
    public class DummySimpleIterableClass implements Iterable<String> {
        List<String> strings = new ArrayList();

        DummySimpleIterableClass() {
            this.strings.add("bar");
            this.strings.add("zip");
        }

        @Override // java.lang.Iterable
        public Iterator<String> iterator() {
            return this.strings.iterator();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/routing/ForeachTestCase$FailingProcessorMatcher.class */
    public static class FailingProcessorMatcher extends BaseMatcher<MessagingException> {
        private final Processor expectedFailingProcessor;

        FailingProcessorMatcher(Processor processor) {
            this.expectedFailingProcessor = processor;
        }

        public boolean matches(Object obj) {
            return (obj instanceof MessagingException) && ((MessagingException) obj).getFailingComponent() == this.expectedFailingProcessor;
        }

        public void describeTo(Description description) {
            description.appendText("Exception is not a MessagingException or failing processor does not match.");
        }
    }

    @FunctionalInterface
    /* loaded from: input_file:org/mule/runtime/core/internal/routing/ForeachTestCase$InternalTestProcessor.class */
    private interface InternalTestProcessor extends Processor, InternalProcessor {
    }

    public ForeachTestCase(AbstractReactiveProcessorTestCase.Mode mode) {
        super(mode);
        this.expectedException = ExpectedException.none();
    }

    @Before
    public void initialise() throws MuleException {
        this.processedEvents = new ArrayList();
        this.simpleForeach = createForeach(getSimpleMessageProcessors(new TestMessageProcessor("zas")));
        this.nestedForeach = createForeach(getNestedMessageProcessors());
    }

    @After
    public void after() {
        LifecycleUtils.disposeIfNeeded(this.nestedForeach, LOGGER);
        LifecycleUtils.disposeIfNeeded(this.simpleForeach, LOGGER);
        LifecycleUtils.disposeIfNeeded(this.foreach, LOGGER);
        LifecycleUtils.disposeIfNeeded(this.chainedForeach, LOGGER);
        if (this.executorService != null) {
            this.executorService.shutdownNow();
        }
    }

    private List<Processor> getSimpleMessageProcessors(Processor processor) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(coreEvent -> {
            return CoreEvent.builder(coreEvent).message(InternalMessage.builder(coreEvent.getMessage()).value((coreEvent.getMessage().getPayload().getValue() instanceof List ? ((List) ((List) coreEvent.getMessage().getPayload().getValue()).stream().map((v0) -> {
                return v0.getValue();
            }).collect(Collectors.toList())).toString() : coreEvent.getMessage().getPayload().getValue().toString()) + ":foo").build()).build();
        });
        arrayList.add(processor);
        arrayList.add(coreEvent2 -> {
            this.variables = coreEvent2.getVariables();
            this.processedEvents.add(coreEvent2);
            return coreEvent2;
        });
        return arrayList;
    }

    private List<Processor> getNestedMessageProcessors() {
        ArrayList arrayList = new ArrayList();
        Foreach createForeach = createForeach();
        createForeach.setMessageProcessors(getSimpleMessageProcessors(new TestMessageProcessor("zas")));
        arrayList.add(createForeach);
        return arrayList;
    }

    private Foreach createForeach(List<Processor> list) throws MuleException {
        Foreach createForeach = createForeach();
        createForeach.setMessageProcessors(list);
        LifecycleUtils.initialiseIfNeeded(createForeach, muleContext);
        return createForeach;
    }

    @Test
    public void arrayListPayload() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add("bar");
        arrayList.add("zip");
        process(this.simpleForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(arrayList)).build());
        assertSimpleProcessedMessages();
    }

    @Test
    public void arrayPayload() throws Exception {
        process(this.simpleForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(new String[]{"bar", "zip"})).build());
        assertSimpleProcessedMessages();
    }

    @Test
    public void muleMessageCollectionPayload() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(Message.of("bar"));
        arrayList.add(Message.of("zip"));
        process(this.simpleForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(arrayList)).build());
        assertSimpleProcessedMessages();
    }

    @Test
    public void iterablePayload() throws Exception {
        process(this.simpleForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(new DummySimpleIterableClass())).build());
        assertSimpleProcessedMessages();
    }

    @Test
    public void iteratorPayload() throws Exception {
        process(this.simpleForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(new DummySimpleIterableClass().iterator())).build());
        assertSimpleProcessedMessages();
    }

    @Test
    public void mapPayload() throws Exception {
        this.expectedException.expect(IllegalArgumentException.class);
        this.expectedException.expectMessage("Foreach does not support 'java.util.Map' with no collection expression. To iterate over Map entries use '#[dw::core::Objects::entrySet(payload)]'");
        process(this.simpleForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(Collections.singletonMap("foo", "bar"))).build());
    }

    @Test
    public void mapEntrySetExpression() throws Exception {
        this.simpleForeach.setCollectionExpression("#[dw::core::Objects::entrySet(payload)]");
        assertForEachContextConsumption((InternalEvent) process(this.simpleForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(Collections.singletonMap("foo", "bar"))).build()));
    }

    @Test
    public void nestedArrayListPayload() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        ArrayList arrayList4 = new ArrayList();
        arrayList2.add("a1");
        arrayList2.add("a2");
        arrayList2.add("a3");
        arrayList3.add("b1");
        arrayList3.add("b2");
        arrayList4.add("c1");
        arrayList.add(arrayList2);
        arrayList.add(arrayList3);
        arrayList.add(arrayList4);
        process(this.nestedForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(arrayList)).build());
        assertNestedProcessedMessages();
    }

    @Test
    public void nestedArrayPayload() throws Exception {
        String[][] strArr = new String[3][2];
        strArr[0][0] = "a1";
        strArr[0][1] = "a2";
        strArr[1][0] = "a3";
        strArr[1][1] = "b1";
        strArr[2][0] = "b2";
        strArr[2][1] = "c1";
        process(this.nestedForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(strArr)).build());
        assertNestedProcessedMessages();
    }

    @Test
    public void nestedMuleMessageCollectionPayload() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        ArrayList arrayList3 = new ArrayList();
        arrayList2.add(Message.of("a1"));
        arrayList2.add(Message.of("a2"));
        arrayList2.add(Message.of("a3"));
        arrayList3.add(Message.of("b1"));
        arrayList3.add(Message.of("b2"));
        arrayList3.add(Message.of("c1"));
        arrayList.add(Message.of(arrayList2));
        arrayList.add(Message.of(arrayList3));
        process(this.nestedForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(arrayList)).build());
        assertNestedProcessedMessages();
    }

    @Test
    public void nestedIterablePayload() throws Exception {
        process(this.nestedForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(new DummyNestedIterableClass())).build());
        assertNestedProcessedMessages();
    }

    @Test
    public void nestedIteratorPayload() throws Exception {
        process(this.nestedForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(new DummyNestedIterableClass())).build());
        assertNestedProcessedMessages();
    }

    @Test
    public void failingNestedProcessor() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        BufferOverflowException bufferOverflowException = new BufferOverflowException();
        this.foreach = createForeach();
        Processor sensingNullMessageProcessor = new SensingNullMessageProcessor();
        InternalTestProcessor internalTestProcessor = coreEvent -> {
            atomicReference.set(coreEvent);
            throw bufferOverflowException;
        };
        this.foreach.setMessageProcessors(Arrays.asList(sensingNullMessageProcessor, internalTestProcessor));
        LifecycleUtils.initialiseIfNeeded(this.foreach, muleContext);
        try {
            expectNestedProcessorException(bufferOverflowException, internalTestProcessor);
            process(this.foreach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(new DummyNestedIterableClass().iterator())).build(), false);
            Assert.assertThat(Integer.valueOf(((SensingNullMessageProcessor) sensingNullMessageProcessor).invocations), Matchers.equalTo(1));
            assertForEachContextConsumption((InternalEvent) atomicReference.get());
        } catch (Throwable th) {
            Assert.assertThat(Integer.valueOf(((SensingNullMessageProcessor) sensingNullMessageProcessor).invocations), Matchers.equalTo(1));
            assertForEachContextConsumption((InternalEvent) atomicReference.get());
            throw th;
        }
    }

    private void expectNestedProcessorException(RuntimeException runtimeException, InternalTestProcessor internalTestProcessor) {
        this.expectedException.expect(MessagingException.class);
        this.expectedException.expect(new FailingProcessorMatcher(internalTestProcessor));
        this.expectedException.expectCause(CoreMatchers.is(runtimeException));
    }

    @Test
    public void failingNestedProcessorInChain() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        BufferOverflowException bufferOverflowException = new BufferOverflowException();
        this.foreach = createForeach();
        Processor sensingNullMessageProcessor = new SensingNullMessageProcessor();
        InternalTestProcessor internalTestProcessor = coreEvent -> {
            atomicReference.set(coreEvent);
            throw bufferOverflowException;
        };
        this.foreach.setMessageProcessors(Arrays.asList(sensingNullMessageProcessor, internalTestProcessor));
        MessageProcessorChain createChain = createChain(this.foreach);
        try {
            expectNestedProcessorException(bufferOverflowException, internalTestProcessor);
            processInChain(createChain, MuleContextUtils.eventBuilder(muleContext).message(Message.of(new DummyNestedIterableClass().iterator())).build());
            Assert.assertThat(Integer.valueOf(((SensingNullMessageProcessor) sensingNullMessageProcessor).invocations), Matchers.equalTo(1));
            assertForEachContextConsumption((InternalEvent) atomicReference.get());
            LifecycleUtils.disposeIfNeeded(createChain, LOGGER);
        } catch (Throwable th) {
            Assert.assertThat(Integer.valueOf(((SensingNullMessageProcessor) sensingNullMessageProcessor).invocations), Matchers.equalTo(1));
            assertForEachContextConsumption((InternalEvent) atomicReference.get());
            LifecycleUtils.disposeIfNeeded(createChain, LOGGER);
            throw th;
        }
    }

    @Test
    public void nestedForeachWithFailingExpression() throws Exception {
        Foreach createForeach = createForeach();
        createForeach.setCollectionExpression("!@INVALID");
        SensingNullMessageProcessor sensingNullMessageProcessor = new SensingNullMessageProcessor();
        createForeach.setMessageProcessors(Collections.singletonList(sensingNullMessageProcessor));
        Foreach createForeach2 = createForeach();
        createForeach2.setMessageProcessors(Collections.singletonList(createForeach));
        LifecycleUtils.initialiseIfNeeded(createForeach2, muleContext);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3; i++) {
            ArrayList arrayList2 = new ArrayList();
            for (int i2 = 0; i2 < 2; i2++) {
                arrayList2.add(i + "-" + i2);
            }
            arrayList.add(arrayList2);
        }
        try {
            expectExpressionException(createForeach);
            process(createForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(arrayList)).build(), false);
            Assert.assertThat(Integer.valueOf(sensingNullMessageProcessor.invocations), Matchers.equalTo(0));
            LifecycleUtils.disposeIfNeeded(createForeach2, LOGGER);
        } catch (Throwable th) {
            Assert.assertThat(Integer.valueOf(sensingNullMessageProcessor.invocations), Matchers.equalTo(0));
            LifecycleUtils.disposeIfNeeded(createForeach2, LOGGER);
            throw th;
        }
    }

    private Foreach createForeach() {
        Foreach foreach = new Foreach();
        foreach.setAnnotations(getAppleFlowComponentLocationAnnotations());
        return foreach;
    }

    @Test
    public void failingExpression() throws Exception {
        this.foreach = createForeach();
        this.foreach.setCollectionExpression("!@INVALID");
        SensingNullMessageProcessor sensingNullMessageProcessor = new SensingNullMessageProcessor();
        List<Processor> simpleMessageProcessors = getSimpleMessageProcessors(new TestMessageProcessor("zas"));
        simpleMessageProcessors.add(0, sensingNullMessageProcessor);
        this.foreach.setMessageProcessors(simpleMessageProcessors);
        LifecycleUtils.initialiseIfNeeded(this.foreach, muleContext);
        try {
            expectExpressionException(this.foreach);
            process(this.foreach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(new DummyNestedIterableClass().iterator())).build(), false);
        } finally {
            Assert.assertThat(Integer.valueOf(sensingNullMessageProcessor.invocations), Matchers.equalTo(Integer.valueOf(0)));
        }
    }

    @Test
    public void failingExpressionInChain() throws Exception {
        this.foreach = createForeach();
        this.foreach.setCollectionExpression("!@INVALID");
        SensingNullMessageProcessor sensingNullMessageProcessor = new SensingNullMessageProcessor();
        List<Processor> simpleMessageProcessors = getSimpleMessageProcessors(new TestMessageProcessor("zas"));
        simpleMessageProcessors.add(0, sensingNullMessageProcessor);
        this.foreach.setMessageProcessors(simpleMessageProcessors);
        MessageProcessorChain createChain = createChain(this.foreach);
        LifecycleUtils.disposeIfNeeded(this.foreach, LOGGER);
        try {
            expectExpressionException(this.foreach);
            processInChain(createChain, MuleContextUtils.eventBuilder(muleContext).message(Message.of(new DummyNestedIterableClass().iterator())).build());
            Assert.assertThat(Integer.valueOf(sensingNullMessageProcessor.invocations), Matchers.equalTo(0));
            LifecycleUtils.disposeIfNeeded(createChain, LOGGER);
        } catch (Throwable th) {
            Assert.assertThat(Integer.valueOf(sensingNullMessageProcessor.invocations), Matchers.equalTo(0));
            LifecycleUtils.disposeIfNeeded(createChain, LOGGER);
            throw th;
        }
    }

    private void expectExpressionException(Foreach foreach) {
        this.expectedException.expect(CoreMatchers.instanceOf(MessagingException.class));
        this.expectedException.expect(new FailingProcessorMatcher(foreach));
        this.expectedException.expectCause(CoreMatchers.instanceOf(ExpressionRuntimeException.class));
    }

    @Test
    public void batchSize() throws Exception {
        this.foreach = createForeach();
        this.foreach.setMessageProcessors(getSimpleMessageProcessors(new TestMessageProcessor("zas")));
        this.foreach.setBatchSize(2);
        LifecycleUtils.initialiseIfNeeded(this.foreach, muleContext);
        this.foreach.process(MuleContextUtils.eventBuilder(muleContext).message(Message.of(Arrays.asList(1, 2, 3))).build());
        Assert.assertThat(this.processedEvents, Matchers.hasSize(2));
        Assert.assertThat(this.processedEvents.get(0).getMessageAsString(muleContext), CoreMatchers.is("[1, 2]:foo:zas"));
        Assert.assertThat(this.processedEvents.get(1).getMessageAsString(muleContext), CoreMatchers.is("[3]:foo:zas"));
        assertForEachContextConsumption((InternalEvent) this.processedEvents.get(0));
        assertForEachContextConsumption((InternalEvent) this.processedEvents.get(1));
    }

    @Test
    public void batchSizeWithCollectionAttributes() throws Exception {
        this.foreach = createForeach();
        this.foreach.setMessageProcessors(getSimpleMessageProcessors(new TestMessageProcessor("zas")));
        this.foreach.setBatchSize(2);
        this.foreach.setCollectionExpression("vars.collection");
        LifecycleUtils.initialiseIfNeeded(this.foreach, muleContext);
        this.foreach.process(MuleContextUtils.eventBuilder(muleContext).addVariable("collection", Arrays.asList(1, 2, 3)).message(Message.of((Object) null)).build());
        Assert.assertThat(this.processedEvents, Matchers.hasSize(2));
        Assert.assertThat(this.processedEvents.get(0).getMessageAsString(muleContext), CoreMatchers.is("[1, 2]:foo:zas"));
        Assert.assertThat(this.processedEvents.get(1).getMessageAsString(muleContext), CoreMatchers.is("[3]:foo:zas"));
        assertForEachContextConsumption((InternalEvent) this.processedEvents.get(0));
        assertForEachContextConsumption((InternalEvent) this.processedEvents.get(1));
    }

    @Test
    public void variables() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add("bar");
        arrayList.add("zip");
        CoreEvent build = MuleContextUtils.eventBuilder(muleContext).message(Message.of(arrayList)).build();
        process(this.simpleForeach, build);
        assertSimpleProcessedMessages();
        Assert.assertThat(this.variables.keySet(), Matchers.hasSize(2));
        Assert.assertThat(this.variables.keySet(), Matchers.hasItems(new String[]{"rootMessage", "counter"}));
        Assert.assertThat(this.variables.get("rootMessage").getDataType(), CoreMatchers.is(DataTypeCompatibilityMatcher.assignableTo(DataType.MULE_MESSAGE)));
        Assert.assertThat(this.variables.get("rootMessage").getValue(), Matchers.equalTo(build.getMessage()));
        Assert.assertThat(this.variables.get("counter").getDataType(), Matchers.equalTo(DataType.NUMBER));
        Assert.assertThat(this.variables.get("counter").getValue(), Matchers.equalTo(2));
    }

    @Test
    public void empty() throws Exception {
        CoreEvent build = MuleContextUtils.eventBuilder(muleContext).message(Message.of(Collections.emptyList())).build();
        Assert.assertThat(process(this.simpleForeach, build).getMessage(), Matchers.equalTo(build.getMessage()));
        Assert.assertThat(this.processedEvents, Matchers.hasSize(0));
    }

    @Test
    @Issue("MULE-18573")
    public void muleMessageContainingACursorStreamShouldBeManagedByCursorManager() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        this.foreach = createForeach();
        this.foreach.setMessageProcessors(Arrays.asList(coreEvent -> {
            atomicReference.set(coreEvent);
            return coreEvent;
        }));
        LifecycleUtils.initialiseIfNeeded(this.foreach, muleContext);
        CursorProvider cursorProvider = (CursorProvider) Mockito.mock(CursorStreamProvider.class);
        CoreEvent build = MuleContextUtils.eventBuilder(muleContext).message(Message.of(Collections.singletonList(Message.of(cursorProvider)))).build();
        Assert.assertThat(process(this.foreach, build).getMessage(), Matchers.equalTo(build.getMessage()));
        Assert.assertThat(((CoreEvent) atomicReference.get()).getMessage().getPayload().getValue(), CoreMatchers.is(CoreMatchers.instanceOf(ManagedCursorProvider.class)));
        Assert.assertThat(CursorUtils.unwrap((ManagedCursorProvider) ((CoreEvent) atomicReference.get()).getMessage().getPayload().getValue()), CoreMatchers.is(CoreMatchers.sameInstance(cursorProvider)));
    }

    @Test
    @Issue("MULE-18573")
    public void cursorStreamShouldBeManagedByCursorManager() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        this.foreach = createForeach();
        this.foreach.setMessageProcessors(Arrays.asList(coreEvent -> {
            atomicReference.set(coreEvent);
            return coreEvent;
        }));
        LifecycleUtils.initialiseIfNeeded(this.foreach, muleContext);
        CursorProvider cursorProvider = (CursorProvider) Mockito.mock(CursorStreamProvider.class);
        CoreEvent build = MuleContextUtils.eventBuilder(muleContext).message(Message.of(Collections.singletonList(cursorProvider))).build();
        Assert.assertThat(process(this.foreach, build).getMessage(), Matchers.equalTo(build.getMessage()));
        Assert.assertThat(((CoreEvent) atomicReference.get()).getMessage().getPayload().getValue(), CoreMatchers.is(CoreMatchers.instanceOf(ManagedCursorProvider.class)));
        Assert.assertThat(CursorUtils.unwrap((ManagedCursorProvider) ((CoreEvent) atomicReference.get()).getMessage().getPayload().getValue()), CoreMatchers.is(CoreMatchers.sameInstance(cursorProvider)));
    }

    @Test
    @io.qameta.allure.Description("ForEach should set itemSequenceInfo")
    @Issue("MULE-16764")
    public void itemSequences() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add("one");
        arrayList.add("two");
        arrayList.add("three");
        arrayList.add("four");
        process(this.simpleForeach, MuleContextUtils.eventBuilder(muleContext).message(Message.of(arrayList)).build());
        Assert.assertThat(ERR_INVALID_ITEM_SEQUENCE, (List) this.processedEvents.stream().map(coreEvent -> {
            return (Integer) coreEvent.getItemSequenceInfo().map(itemSequenceInfo -> {
                return Integer.valueOf(itemSequenceInfo.getPosition());
            }).orElse(-1);
        }).collect(Collectors.toList()), CoreMatchers.is(Arrays.asList(0, 1, 2, 3)));
    }

    @Test
    @io.qameta.allure.Description("ForEach doesn't override any itemSequence after completion")
    @Issue("MULE-16764")
    public void notOverrideParentSequence() throws Exception {
        ArrayList arrayList = new ArrayList();
        Optional of = Optional.of(ItemSequenceInfo.of(666));
        arrayList.add("A");
        arrayList.add("B");
        arrayList.add("C");
        Assert.assertThat(ERR_SEQUENCE_OVERRIDDEN, process(this.simpleForeach, MuleContextUtils.eventBuilder(muleContext).itemSequenceInfo(of).message(Message.of(arrayList)).build()).getItemSequenceInfo(), CoreMatchers.is(of));
    }

    @Test
    public void subscriberContextPropagation() throws MuleException {
        ContextPropagationChecker contextPropagationChecker = new ContextPropagationChecker();
        LifecycleUtils.disposeIfNeeded(this.simpleForeach, LOGGER);
        this.simpleForeach = createForeach(Collections.singletonList(contextPropagationChecker));
        ContextPropagationChecker.assertContextPropagation(MuleContextUtils.eventBuilder(muleContext).message(Message.of(Arrays.asList("1", "2", "3"))).build(), this.simpleForeach, contextPropagationChecker);
    }

    @Test
    @Issue("MULE-19143")
    public void multiplesThreadsUsingSameForeach() throws Exception {
        this.foreach = createForeach();
        AtomicInteger atomicInteger = new AtomicInteger();
        this.foreach.setMessageProcessors(Arrays.asList(coreEvent -> {
            atomicInteger.incrementAndGet();
            return coreEvent;
        }));
        LifecycleUtils.initialiseIfNeeded(this.foreach, muleContext);
        AtomicInteger atomicInteger2 = new AtomicInteger();
        InternalTestProcessor internalTestProcessor = coreEvent2 -> {
            atomicInteger2.incrementAndGet();
            return coreEvent2;
        };
        this.chainedForeach = createForeach();
        this.chainedForeach.setMessageProcessors(Arrays.asList(internalTestProcessor));
        LifecycleUtils.initialiseIfNeeded(this.chainedForeach, muleContext);
        CoreEvent process = this.foreach.process(MuleContextUtils.eventBuilder(muleContext).message(Message.of(Arrays.asList(1, 2, 3))).build());
        Assert.assertThat(Integer.valueOf(atomicInteger.get()), CoreMatchers.is(3));
        this.executorService = Executors.newFixedThreadPool(CONCURRENCY);
        CountDownLatch countDownLatch = new CountDownLatch(CONCURRENCY);
        Latch latch = new Latch();
        List<String> stringsWithSameHashCode = StringHashCodeCollisionGenerator.stringsWithSameHashCode(CONCURRENCY);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 3; i++) {
            arrayList.add("" + i);
        }
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        for (String str : stringsWithSameHashCode) {
            BaseEventContext baseEventContext = (BaseEventContext) Mockito.spy(MessageProcessors.newChildContext(process, Optional.empty()));
            Mockito.when(baseEventContext.getId()).thenReturn(str);
            CoreEvent build = CoreEvent.builder(baseEventContext, process).message(Message.of(arrayList)).build();
            this.executorService.submit(() -> {
                try {
                    countDownLatch.countDown();
                    latch.await();
                    this.chainedForeach.process(build);
                } catch (Throwable th) {
                    synchronizedList.add(th);
                    LOGGER.error("An unexpected error processing events", th);
                }
            });
        }
        countDownLatch.await();
        latch.release();
        this.executorService.awaitTermination(20L, TimeUnit.SECONDS);
        Assert.assertTrue(synchronizedList.isEmpty());
        Assert.assertThat(Integer.valueOf(atomicInteger2.get()), CoreMatchers.is(Integer.valueOf(CONCURRENCY * arrayList.size())));
    }

    private MessageProcessorChain createChain(Processor processor) throws InitialisationException {
        MessageProcessorChain newChain = MessageProcessors.newChain(Optional.empty(), new Processor[]{processor});
        LifecycleUtils.initialiseIfNeeded(newChain, muleContext);
        return newChain;
    }

    private CoreEvent processInChain(MessageProcessorChain messageProcessorChain, CoreEvent coreEvent) throws Exception {
        return process(messageProcessorChain, coreEvent, false);
    }

    private void assertSimpleProcessedMessages() {
        Assert.assertEquals(ERR_NUMBER_MESSAGES, 2L, this.processedEvents.size());
        Assert.assertTrue(ERR_PAYLOAD_TYPE, this.processedEvents.get(0).getMessage().getPayload().getValue() instanceof String);
        Assert.assertTrue(ERR_PAYLOAD_TYPE, this.processedEvents.get(1).getMessage().getPayload().getValue() instanceof String);
        Assert.assertEquals(ERR_OUTPUT, "bar:foo:zas", this.processedEvents.get(0).getMessage().getPayload().getValue());
        Assert.assertEquals(ERR_OUTPUT, "zip:foo:zas", this.processedEvents.get(1).getMessage().getPayload().getValue());
        assertForEachContextConsumption((InternalEvent) this.processedEvents.get(0));
        assertForEachContextConsumption((InternalEvent) this.processedEvents.get(1));
    }

    private void assertNestedProcessedMessages() {
        String[] strArr = {"a1:foo:zas", "a2:foo:zas", "a3:foo:zas", "b1:foo:zas", "b2:foo:zas", "c1:foo:zas"};
        Assert.assertEquals(ERR_NUMBER_MESSAGES, 6L, this.processedEvents.size());
        for (CoreEvent coreEvent : this.processedEvents) {
            Assert.assertTrue(ERR_PAYLOAD_TYPE, coreEvent.getMessage().getPayload().getValue() instanceof String);
            assertForEachContextConsumption((InternalEvent) coreEvent);
        }
        for (int i = 0; i < this.processedEvents.size(); i++) {
            Assert.assertEquals(ERR_OUTPUT, strArr[i], this.processedEvents.get(i).getMessage().getPayload().getValue());
            assertForEachContextConsumption((InternalEvent) this.processedEvents.get(i));
        }
    }

    private void assertForEachContextConsumption(InternalEvent internalEvent) {
        ForeachContext context = ForeachInternalContextManager.getContext(internalEvent);
        if (context != null) {
            Assert.assertThat(Boolean.valueOf(context.getIterator().hasNext()), CoreMatchers.is(false));
        }
    }
}
