package org.mule.runtime.core.internal.routing;

import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.mule.runtime.api.functional.Either;
import org.mule.runtime.api.message.ItemSequenceInfo;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.DataType;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.streaming.StreamingManager;
import org.mule.runtime.core.internal.routing.outbound.EventBuilderConfigurer;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/mule/runtime/core/internal/routing/ForeachRouter.class */
public class ForeachRouter {
    private static final Logger LOGGER = LoggerFactory.getLogger(ForeachRouter.class);
    static final String MAP_NOT_SUPPORTED_MESSAGE = "Foreach does not support 'java.util.Map' with no collection expression. To iterate over Map entries use '#[dw::core::Objects::entrySet(payload)]'";
    private final Foreach owner;
    private final StreamingManager streamingManager;
    private Flux<CoreEvent> upstreamFlux;
    private Flux<CoreEvent> innerFlux;
    private final FluxSinkRecorder<CoreEvent> innerRecorder = new FluxSinkRecorder<>();
    private final FluxSinkRecorder<Either<Throwable, CoreEvent>> downstreamRecorder = new FluxSinkRecorder<>();
    private final AtomicReference<ContextView> downstreamCtxReference = new AtomicReference<>(Context.empty());
    private final AtomicInteger inflightEvents = new AtomicInteger(0);
    private final AtomicBoolean completeDeferred = new AtomicBoolean(false);
    private Flux<CoreEvent> downstreamFlux = Flux.create(fluxSink -> {
        this.downstreamRecorder.accept((FluxSink<Either<Throwable, CoreEvent>>) fluxSink);
        subscribeUpstreamChains(this.downstreamCtxReference.get());
    }).doOnNext(either -> {
        this.inflightEvents.decrementAndGet();
    }).map(either2 -> {
        if (either2.isLeft()) {
            throw Exceptions.propagate((Throwable) either2.getLeft());
        }
        return createResponseEvent((CoreEvent) either2.getRight());
    });

    /* JADX INFO: Access modifiers changed from: package-private */
    public ForeachRouter(Foreach foreach, StreamingManager streamingManager, Publisher<CoreEvent> publisher, String str, int i, MessageProcessorChain messageProcessorChain, boolean z) {
        this.owner = foreach;
        this.streamingManager = streamingManager;
        this.upstreamFlux = Flux.from(publisher).doOnNext(coreEvent -> {
            if (z && foreach.isMapExpression(coreEvent)) {
                this.downstreamRecorder.next(Either.left(new MessagingException(coreEvent, new IllegalArgumentException(MAP_NOT_SUPPORTED_MESSAGE), foreach)));
            }
            this.inflightEvents.getAndIncrement();
            CoreEvent prepareEvent = prepareEvent(coreEvent, str);
            if (prepareEvent != null) {
                this.innerRecorder.next(prepareEvent);
            }
        }).doOnComplete(() -> {
            if (this.inflightEvents.get() == 0) {
                completeRouter();
            } else {
                this.completeDeferred.set(true);
            }
        });
        this.innerFlux = Flux.create(this.innerRecorder).map(coreEvent2 -> {
            ForeachContext context = ForeachInternalContextManager.getContext(coreEvent2);
            if (!context.getIterator().hasNext() && context.getElementNumber().get() == 0) {
                this.downstreamRecorder.next(Either.right(Throwable.class, coreEvent2));
                completeRouterIfNecessary();
            }
            return createTypedValuePartToProcess(foreach, coreEvent2, context, foreach.setCurrentValue(i, context, coreEvent2));
        }).transform(flux -> {
            return MessageProcessors.applyWithChildContext(flux, messageProcessorChain, Optional.of(foreach.getLocation()));
        }).doOnNext(coreEvent3 -> {
            try {
                ForeachContext context = ForeachInternalContextManager.getContext(coreEvent3);
                if (context.getOnComplete().isPresent()) {
                    context.getOnComplete().get().run();
                }
                if (context.getIterator().hasNext()) {
                    this.innerRecorder.next(coreEvent3);
                } else {
                    this.downstreamRecorder.next(Either.right(coreEvent3));
                    completeRouterIfNecessary();
                }
            } catch (Exception e) {
                LOGGER.error("Exception in foreach after iteration", e);
                eventWithCurrentContextDeleted(coreEvent3);
                this.downstreamRecorder.next(Either.left(new MessagingException(coreEvent3, e, foreach)));
                completeRouterIfNecessary();
            }
        }).onErrorContinue(MessagingException.class, (th, obj) -> {
            ((MessagingException) th).setProcessedEvent(eventWithCurrentContextDeleted(restoreSequenceInfo(((MessagingException) th).getEvent())));
            this.downstreamRecorder.next(Either.left(th));
            completeRouterIfNecessary();
        });
    }

    private CoreEvent restoreSequenceInfo(CoreEvent coreEvent) {
        return CoreEvent.builder(coreEvent).itemSequenceInfo(ForeachInternalContextManager.getContext(coreEvent).getItemSequenceInfo()).build();
    }

    private void completeRouterIfNecessary() {
        if (this.completeDeferred.get() && this.inflightEvents.get() == 0) {
            completeRouter();
        }
    }

    private void completeRouter() {
        this.innerRecorder.complete();
        this.downstreamRecorder.complete();
    }

    private CoreEvent eventWithCurrentContextDeleted(CoreEvent coreEvent) {
        ForeachInternalContextManager.removeContext(coreEvent);
        return coreEvent;
    }

    private CoreEvent prepareEvent(CoreEvent coreEvent, String str) {
        CoreEvent build = CoreEvent.builder(coreEvent).addVariable(this.owner.getRootMessageVariableName(), coreEvent.getMessage(), DataType.MULE_MESSAGE).build();
        try {
            Iterator<TypedValue<?>> splitRequest = this.owner.splitRequest(build, str);
            if (splitRequest.hasNext()) {
                ForeachInternalContextManager.addContext(build, createForeachContext(coreEvent, splitRequest));
                return build;
            }
            this.downstreamRecorder.next(Either.right(Throwable.class, coreEvent));
            completeRouterIfNecessary();
            return null;
        } catch (Exception e) {
            eventWithCurrentContextDeleted(build);
            this.downstreamRecorder.next(Either.left(new MessagingException(build, e, this.owner)));
            completeRouterIfNecessary();
            return null;
        }
    }

    private CoreEvent createTypedValuePartToProcess(Foreach foreach, CoreEvent coreEvent, ForeachContext foreachContext, TypedValue typedValue) {
        CoreEvent.Builder itemSequenceInfo = CoreEvent.builder(coreEvent).itemSequenceInfo(Optional.of(ItemSequenceInfo.of(foreachContext.getElementNumber().get())));
        TypedValue<?> manageTypedValueForStreaming = ForeachUtils.manageTypedValueForStreaming(typedValue, coreEvent, this.streamingManager);
        if (typedValue.getValue() instanceof EventBuilderConfigurer) {
            EventBuilderConfigurer eventBuilderConfigurer = (EventBuilderConfigurer) typedValue.getValue();
            eventBuilderConfigurer.configure(itemSequenceInfo);
            Objects.requireNonNull(eventBuilderConfigurer);
            foreachContext.setOnComplete(eventBuilderConfigurer::eventCompleted);
        } else if (typedValue.getValue() instanceof Message) {
            itemSequenceInfo.message(Message.builder((Message) typedValue.getValue()).payload(manageTypedValueForStreaming).build());
        } else {
            itemSequenceInfo.message(Message.builder().payload(manageTypedValueForStreaming).build());
        }
        return itemSequenceInfo.addVariable(foreach.getCounterVariableName(), Integer.valueOf(foreachContext.getElementNumber().incrementAndGet()), DataType.NUMBER).build();
    }

    private ForeachContext createForeachContext(CoreEvent coreEvent, Iterator<TypedValue<?>> it) {
        return new ForeachContext(coreEvent.getVariables().containsKey(this.owner.getCounterVariableName()) ? coreEvent.getVariables().get(this.owner.getCounterVariableName()).getValue() : null, coreEvent.getVariables().containsKey(this.owner.getRootMessageVariableName()) ? coreEvent.getVariables().get(this.owner.getRootMessageVariableName()).getValue() : null, coreEvent.getMessage(), coreEvent.getItemSequenceInfo(), it);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Publisher<CoreEvent> getDownstreamPublisher() {
        return this.downstreamFlux.transformDeferredContextual((flux, contextView) -> {
            return flux.doOnSubscribe(subscription -> {
                this.downstreamCtxReference.set(contextView);
            });
        });
    }

    private void subscribeUpstreamChains(ContextView contextView) {
        this.innerFlux.contextWrite(contextView).subscribe();
        this.upstreamFlux.contextWrite(contextView).subscribe();
    }

    private CoreEvent createResponseEvent(CoreEvent coreEvent) {
        ForeachContext context = ForeachInternalContextManager.getContext(coreEvent);
        if (context == null) {
            return coreEvent;
        }
        CoreEvent.Builder itemSequenceInfo = CoreEvent.builder(coreEvent).message(context.getOriginalMessage()).itemSequenceInfo(context.getItemSequenceInfo());
        restoreVariables(context.getPreviousCounter(), context.getPreviousRootMessage(), itemSequenceInfo);
        return eventWithCurrentContextDeleted(itemSequenceInfo.build());
    }

    private void restoreVariables(Object obj, Object obj2, CoreEvent.Builder builder) {
        if (obj != null) {
            builder.addVariable(this.owner.getCounterVariableName(), obj, DataType.NUMBER);
        } else {
            builder.removeVariable(this.owner.getCounterVariableName());
        }
        if (obj2 != null) {
            builder.addVariable(this.owner.getRootMessageVariableName(), obj2, DataType.MULE_MESSAGE);
        } else {
            builder.removeVariable(this.owner.getRootMessageVariableName());
        }
    }
}
