package org.mule.runtime.core.internal.routing;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import javax.inject.Inject;
import org.mule.runtime.api.config.FeatureFlaggingService;
import org.mule.runtime.api.config.MuleRuntimeFeature;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.core.api.el.ExpressionManager;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.strategy.AsyncProcessingStrategyFactory;
import org.mule.runtime.core.api.streaming.StreamingManager;
import org.mule.runtime.core.api.util.StreamingUtils;
import org.mule.runtime.core.internal.routing.ForkJoinStrategy;
import org.mule.runtime.core.internal.routing.forkjoin.CollectListForkJoinStrategyFactory;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/mule/runtime/core/internal/routing/ParallelForEach.class */
public class ParallelForEach extends AbstractForkJoinRouter {

    @Inject
    protected ExpressionManager expressionManager;

    @Inject
    protected StreamingManager streamingManager;

    @Inject
    protected FeatureFlaggingService featureFlaggingService;
    private String collectionExpression = ExpressionSplittingStrategy.DEFAULT_SPLIT_EXPRESSION;
    private SplittingStrategy<CoreEvent, Iterator<TypedValue<?>>> splittingStrategy;
    private List<Processor> messageProcessors;
    private MessageProcessorChain nestedChain;

    @Override // org.mule.runtime.core.internal.routing.AbstractForkJoinRouter, org.mule.runtime.core.api.processor.AbstractMuleObjectOwner
    public void initialise() throws InitialisationException {
        this.nestedChain = MessageProcessors.buildNewChainWithListOfProcessors(Optional.of(resolveProcessingStrategy()), this.messageProcessors);
        this.splittingStrategy = new ExpressionSplittingStrategy(this.expressionManager, this.collectionExpression);
        super.initialise();
    }

    @Override // org.mule.runtime.core.internal.routing.AbstractForkJoinRouter
    protected Publisher<ForkJoinStrategy.RoutingPair> getRoutingPairs(CoreEvent coreEvent) {
        return Flux.fromIterable(() -> {
            return this.splittingStrategy.split(coreEvent);
        }).map(typedValue -> {
            return CoreEvent.builder(coreEvent).message(createMessage(typedValue, coreEvent)).build();
        }).map(coreEvent2 -> {
            return ForkJoinStrategy.RoutingPair.of(coreEvent2, this.nestedChain);
        });
    }

    protected TypedValue manageTypedValuePayload(TypedValue typedValue, CoreEvent coreEvent) {
        return this.featureFlaggingService.isEnabled(MuleRuntimeFeature.PARALLEL_FOREACH_FLATTEN_MESSAGE) ? ForeachUtils.manageTypedValueForStreaming(typedValue, coreEvent, this.streamingManager) : StreamingUtils.updateTypedValueForStreaming(typedValue, coreEvent, this.streamingManager);
    }

    @Override // org.mule.runtime.core.api.processor.AbstractMuleObjectOwner
    protected List<MessageProcessorChain> getOwnedObjects() {
        return Collections.singletonList(this.nestedChain);
    }

    public void setMessageProcessors(List<Processor> list) {
        this.messageProcessors = list;
    }

    @Override // org.mule.runtime.core.internal.routing.AbstractForkJoinRouter
    protected boolean isDelayErrors() {
        return true;
    }

    @Override // org.mule.runtime.core.internal.routing.AbstractForkJoinRouter
    protected int getDefaultMaxConcurrency() {
        return AsyncProcessingStrategyFactory.DEFAULT_MAX_CONCURRENCY;
    }

    @Override // org.mule.runtime.core.internal.routing.AbstractForkJoinRouter
    protected ForkJoinStrategyFactory getDefaultForkJoinStrategyFactory() {
        return new CollectListForkJoinStrategyFactory(false);
    }

    public void setCollectionExpression(String str) {
        this.collectionExpression = str;
    }

    private Message createMessage(TypedValue<?> typedValue, CoreEvent coreEvent) {
        return (this.featureFlaggingService.isEnabled(MuleRuntimeFeature.PARALLEL_FOREACH_FLATTEN_MESSAGE) && (typedValue.getValue() instanceof Message)) ? Message.builder((Message) typedValue.getValue()).payload(manageTypedValuePayload(typedValue, coreEvent)).build() : Message.builder().payload(manageTypedValuePayload(typedValue, coreEvent)).build();
    }
}
