package org.mule.runtime.core.routing;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.inject.Inject;
import org.apache.commons.collections.CollectionUtils;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.Preconditions;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.MessageRouter;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.routing.AggregationContext;
import org.mule.runtime.core.api.routing.RoutePathNotFoundException;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.config.i18n.CoreMessages;
import org.mule.runtime.core.internal.util.ProcessingStrategyUtils;
import org.mule.runtime.core.processor.AbstractMessageProcessorOwner;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/mule/runtime/core/routing/ScatterGatherRouter.class */
public class ScatterGatherRouter extends AbstractMessageProcessorOwner implements MessageRouter {
    private static final Logger logger = LoggerFactory.getLogger(ScatterGatherRouter.class);

    @Inject
    private SchedulerService schedulerService;
    private boolean parallel = true;
    private long timeout = 0;
    private List<Processor> routes = new ArrayList();
    private boolean initialised = false;
    private List<Processor> routeChains = Collections.emptyList();
    private AggregationStrategy aggregationStrategy;
    private Scheduler scheduler;
    private reactor.core.scheduler.Scheduler reactorScheduler;

    @Override // org.mule.runtime.core.api.processor.Processor
    public Event process(Event event) throws MuleException {
        return MessageProcessors.processToApply(event, this);
    }

    private void assertMorethanOneRoute() throws RoutePathNotFoundException {
        if (CollectionUtils.isEmpty(this.routes)) {
            throw new RoutePathNotFoundException(CoreMessages.noEndpointsForRouter(), (Processor) null);
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.mule.runtime.core.api.processor.Processor, java.util.function.Function
    public Publisher<Event> apply(Publisher<Event> publisher) {
        return Flux.from(publisher).doOnNext(Exceptions.checkedConsumer(event -> {
            assertMorethanOneRoute();
            AbstractRoutingStrategy.validateMessageIsNotConsumable(event, event.getMessage());
        })).concatMap(event2 -> {
            return Flux.from(Flux.fromIterable(this.routeChains).concatMap(processor -> {
                return Flux.just(event2).transform(scheduleRoute(processor));
            })).collectList().map(Exceptions.checkedFunction(list -> {
                return this.aggregationStrategy.aggregate(new AggregationContext(event2, list));
            }));
        });
    }

    private ReactiveProcessor scheduleRoute(Processor processor) {
        return (ProcessingStrategyUtils.isSynchronousProcessing(this.flowConstruct) || !(this.flowConstruct instanceof Pipeline)) ? publisher -> {
            return Flux.from(publisher).transform(processor).subscribeOn(this.reactorScheduler);
        } : publisher2 -> {
            return Flux.from(publisher2).transform(((Pipeline) this.flowConstruct).getProcessingStrategy().onPipeline(processor));
        };
    }

    @Override // org.mule.runtime.core.processor.AbstractMuleObjectOwner
    public void initialise() throws InitialisationException {
        try {
            buildRouteChains();
            if (this.aggregationStrategy == null) {
                this.aggregationStrategy = new CollectAllAggregationStrategy();
            }
            if (this.timeout <= 0) {
                this.timeout = Long.MAX_VALUE;
            }
            super.initialise();
            this.initialised = true;
        } catch (Exception e) {
            throw new InitialisationException(e, this);
        }
    }

    @Override // org.mule.runtime.core.processor.AbstractMuleObjectOwner
    public void start() throws MuleException {
        this.scheduler = this.schedulerService.ioScheduler(getLocation() != null ? this.muleContext.getSchedulerBaseConfig().withName(getLocation().getLocation()) : this.muleContext.getSchedulerBaseConfig());
        this.reactorScheduler = Schedulers.fromExecutorService(this.scheduler);
        super.start();
    }

    @Override // org.mule.runtime.core.processor.AbstractMuleObjectOwner
    public void stop() throws MuleException {
        super.stop();
        if (this.scheduler != null) {
            this.scheduler.stop();
            this.scheduler = null;
        }
        if (this.reactorScheduler != null) {
            this.reactorScheduler.dispose();
            this.reactorScheduler = null;
        }
    }

    @Override // org.mule.runtime.core.api.processor.MessageRouter
    public void addRoute(Processor processor) throws MuleException {
        checkNotInitialised();
        this.routes.add(processor);
    }

    @Override // org.mule.runtime.core.api.processor.MessageRouter
    public void removeRoute(Processor processor) throws MuleException {
        checkNotInitialised();
        this.routes.remove(processor);
    }

    private void buildRouteChains() {
        Preconditions.checkState(this.routes.size() > 1, "At least 2 routes are required for ScatterGather");
        this.routeChains = (List) this.routes.stream().map(processor -> {
            return MessageProcessors.newChain(MessageProcessors.newExplicitChain(processor));
        }).collect(Collectors.toList());
    }

    private void checkNotInitialised() {
        Preconditions.checkState(!this.initialised, "<scatter-gather> router is not dynamic. Cannot modify routes after initialisation");
    }

    @Override // org.mule.runtime.core.processor.AbstractMessageProcessorOwner
    protected List<Processor> getOwnedMessageProcessors() {
        return this.routeChains;
    }

    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
        this.aggregationStrategy = aggregationStrategy;
    }

    public void setParallel(boolean z) {
        this.parallel = z;
    }

    public void setTimeout(long j) {
        this.timeout = j;
    }

    public void setRoutes(List<Processor> list) {
        this.routes = list;
    }
}
