package org.mule.runtime.core.internal.routing;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Stack;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.functional.Either;
import org.mule.runtime.api.message.Error;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.internal.event.EventInternalContextResolver;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.util.context.Context;

/* loaded from: input_file:org/mule/runtime/core/internal/routing/FirstSuccessfulRouter.class */
class FirstSuccessfulRouter {
    private static final String FIRST_SUCCESSFUL_START_EVENT = "_firstSuccessfulStartEvent";
    private final Component owner;
    private final Flux<CoreEvent> upstreamFlux;
    private final List<FluxSinkRecorder<CoreEvent>> innerRecorders;
    private final Flux<CoreEvent> downstreamFlux;
    private final Logger LOGGER = LoggerFactory.getLogger(FirstSuccessfulRouter.class);
    private final List<Flux<CoreEvent>> innerFluxes = new ArrayList();
    private final FluxSinkRecorder<Either<Throwable, CoreEvent>> downstreamRecorder = new FluxSinkRecorder<>();
    private final EventInternalContextResolver<Stack<CoreEvent>> nextExecutionContextResolver = new EventInternalContextResolver<>(FIRST_SUCCESSFUL_START_EVENT, Stack::new);
    private final AtomicReference<Context> downstreamContextReference = new AtomicReference<>(Context.empty());
    private final AtomicInteger inflightEvents = new AtomicInteger(0);
    private final AtomicBoolean completeDeferred = new AtomicBoolean(false);

    private boolean isOriginalError(Error error, Optional<Error> optional) {
        return ((Boolean) optional.map(error2 -> {
            return Boolean.valueOf(error2.equals(error));
        }).orElse(false)).booleanValue();
    }

    public FirstSuccessfulRouter(Component component, Publisher<CoreEvent> publisher, List<ProcessorRoute> list) {
        this.owner = component;
        this.innerRecorders = (List) list.stream().map(processorRoute -> {
            return new FluxSinkRecorder();
        }).collect(Collectors.toList());
        this.upstreamFlux = Flux.from(publisher).doOnNext(coreEvent -> {
            this.inflightEvents.getAndIncrement();
            this.innerRecorders.get(0).next(startEvent(coreEvent));
        }).doOnComplete(() -> {
            if (this.inflightEvents.get() == 0) {
                completeRouter();
            } else {
                this.completeDeferred.set(true);
            }
        });
        int i = 0;
        while (i < list.size()) {
            this.innerFluxes.add(createMidFlux(list.get(i), this.innerRecorders.get(i), Optional.ofNullable(i < list.size() - 1 ? this.innerRecorders.get(i + 1) : null)));
            i++;
        }
        this.downstreamFlux = Flux.create(fluxSink -> {
            this.downstreamRecorder.accept((FluxSink<Either<Throwable, CoreEvent>>) fluxSink);
            subscribeUpstreamChains(this.downstreamContextReference.get());
        }).doOnNext(either -> {
            this.inflightEvents.decrementAndGet();
        }).map(getScopeResultMapper());
    }

    private Function<Either<Throwable, CoreEvent>, CoreEvent> getScopeResultMapper() {
        return either -> {
            if (either.isLeft()) {
                throw Exceptions.propagate((Throwable) either.getLeft());
            }
            return (CoreEvent) either.getRight();
        };
    }

    private void completeRouter() {
        Iterator<FluxSinkRecorder<CoreEvent>> it = this.innerRecorders.iterator();
        while (it.hasNext()) {
            it.next().complete();
        }
        this.downstreamRecorder.complete();
    }

    private void completeRouterIfNecessary() {
        if (this.completeDeferred.get() && this.inflightEvents.get() == 0) {
            completeRouter();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Publisher<CoreEvent> getDownstreamPublisher() {
        return this.downstreamFlux.compose(flux -> {
            return Mono.subscriberContext().flatMapMany(context -> {
                return flux.doOnSubscribe(subscription -> {
                    this.downstreamContextReference.set(context);
                });
            });
        });
    }

    private void subscribeUpstreamChains(Context context) {
        Iterator<Flux<CoreEvent>> it = this.innerFluxes.iterator();
        while (it.hasNext()) {
            it.next().subscriberContext(context).subscribe();
        }
        this.upstreamFlux.subscriberContext(context).subscribe();
    }

    private CoreEvent startEvent(CoreEvent coreEvent) {
        Stack<CoreEvent> currentContextFromEvent = this.nextExecutionContextResolver.getCurrentContextFromEvent(coreEvent);
        currentContextFromEvent.push(coreEvent);
        return this.nextExecutionContextResolver.eventWithContext(coreEvent, currentContextFromEvent);
    }

    private void executeNext(Optional<FluxSinkRecorder<CoreEvent>> optional, CoreEvent coreEvent, Throwable th) {
        CoreEvent pop = this.nextExecutionContextResolver.getCurrentContextFromEvent(coreEvent).pop();
        if (optional.isPresent()) {
            optional.get().next(startEvent(pop));
        } else {
            this.downstreamRecorder.next(Either.left(th, CoreEvent.class));
        }
    }

    private Flux<CoreEvent> createMidFlux(ProcessorRoute processorRoute, FluxSinkRecorder<CoreEvent> fluxSinkRecorder, Optional<FluxSinkRecorder<CoreEvent>> optional) {
        return fluxSinkRecorder.flux().transform(flux -> {
            return MessageProcessors.applyWithChildContext(flux, processorRoute.getProcessor(), Optional.of(this.owner.getLocation()));
        }).doOnNext(coreEvent -> {
            if (coreEvent.getError().isPresent()) {
                if (!isOriginalError((Error) coreEvent.getError().get(), this.nextExecutionContextResolver.getCurrentContextFromEvent(coreEvent).peek().getError())) {
                    executeNext(optional, coreEvent, ((Error) coreEvent.getError().get()).getCause());
                    return;
                }
            }
            Stack<CoreEvent> currentContextFromEvent = this.nextExecutionContextResolver.getCurrentContextFromEvent(coreEvent);
            currentContextFromEvent.pop();
            this.downstreamRecorder.next(Either.right(Throwable.class, this.nextExecutionContextResolver.eventWithContext(coreEvent, currentContextFromEvent)));
            completeRouterIfNecessary();
        }).onErrorContinue((th, obj) -> {
            if (obj instanceof CoreEvent) {
                executeNext(optional, (CoreEvent) obj, th);
            } else if (th instanceof MessagingException) {
                executeNext(optional, ((MessagingException) th).getEvent(), th);
            }
        });
    }
}
