package org.mule.runtime.core.internal.construct;

import com.google.common.base.Functions;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.mule.runtime.api.deployment.management.ComponentInitialStateManager;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.notification.EnrichedNotificationInfo;
import org.mule.runtime.api.notification.NotificationDispatcher;
import org.mule.runtime.api.notification.PipelineMessageNotification;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.config.i18n.CoreMessages;
import org.mule.runtime.core.api.connector.ConnectException;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.exception.Errors;
import org.mule.runtime.core.api.exception.FlowExceptionHandler;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.management.stats.FlowConstructStatistics;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.AsyncProcessingStrategyFactory;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.internal.context.MuleContextWithRegistries;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.processor.strategy.DirectProcessingStrategyFactory;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.processor.MessageProcessorBuilder;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.chain.DefaultMessageProcessorChainBuilder;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChainBuilder;
import org.mule.runtime.core.privileged.registry.RegistrationException;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/mule/runtime/core/internal/construct/AbstractPipeline.class */
public abstract class AbstractPipeline extends AbstractFlowConstruct implements Pipeline {
    private final NotificationDispatcher notificationFirer;
    private final MessageSource source;
    private final List<Processor> processors;
    private MessageProcessorChain pipeline;
    private final ErrorType overloadErrorType;
    private final ProcessingStrategyFactory processingStrategyFactory;
    private final ProcessingStrategy processingStrategy;
    private volatile boolean canProcessMessage;
    private Sink sink;
    private final int maxConcurrency;
    private final ComponentInitialStateManager componentInitialStateManager;

    public AbstractPipeline(String str, MuleContext muleContext, MessageSource messageSource, List<Processor> list, Optional<FlowExceptionHandler> optional, Optional<ProcessingStrategyFactory> optional2, String str2, Integer num, FlowConstructStatistics flowConstructStatistics, ComponentInitialStateManager componentInitialStateManager) {
        super(str, muleContext, optional, str2, flowConstructStatistics);
        this.canProcessMessage = false;
        try {
            this.notificationFirer = (NotificationDispatcher) ((MuleContextWithRegistries) muleContext).getRegistry().lookupObject(NotificationDispatcher.class);
            this.source = messageSource;
            this.componentInitialStateManager = componentInitialStateManager;
            this.processors = Collections.unmodifiableList(list);
            this.maxConcurrency = num != null ? num.intValue() : AsyncProcessingStrategyFactory.DEFAULT_MAX_CONCURRENCY;
            this.processingStrategyFactory = optional2.orElseGet(() -> {
                return defaultProcessingStrategy();
            });
            if (this.processingStrategyFactory instanceof AsyncProcessingStrategyFactory) {
                ((AsyncProcessingStrategyFactory) this.processingStrategyFactory).setMaxConcurrency(this.maxConcurrency);
            } else if (num != null) {
                LOGGER.warn("{} does not support 'maxConcurrency'. Ignoring the value.", this.processingStrategyFactory.getClass().getSimpleName());
            }
            this.processingStrategy = this.processingStrategyFactory.create(muleContext, getName());
            this.overloadErrorType = muleContext.getErrorTypeRepository().getErrorType(Errors.ComponentIdentifiers.Unhandleable.OVERLOAD).orElse(null);
        } catch (RegistrationException e) {
            throw new MuleRuntimeException(e);
        }
    }

    protected MessageProcessorChain createPipeline() throws MuleException {
        DefaultMessageProcessorChainBuilder defaultMessageProcessorChainBuilder = new DefaultMessageProcessorChainBuilder();
        defaultMessageProcessorChainBuilder.setName("'" + getName() + "' processor chain");
        if (this.processingStrategy != null) {
            defaultMessageProcessorChainBuilder.setProcessingStrategy(this.processingStrategy);
        }
        configureMessageProcessors(defaultMessageProcessorChainBuilder);
        return defaultMessageProcessorChainBuilder.build();
    }

    protected ProcessingStrategyFactory createDefaultProcessingStrategyFactory() {
        return new DirectProcessingStrategyFactory();
    }

    private ProcessingStrategyFactory defaultProcessingStrategy() {
        ProcessingStrategyFactory defaultProcessingStrategyFactory = getMuleContext().getConfiguration().getDefaultProcessingStrategyFactory();
        return defaultProcessingStrategyFactory == null ? createDefaultProcessingStrategyFactory() : defaultProcessingStrategyFactory;
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public List<Processor> getProcessors() {
        return this.processors;
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public MessageSource getSource() {
        return this.source;
    }

    protected MessageProcessorChain getPipeline() {
        return this.pipeline;
    }

    @Override // org.mule.runtime.core.api.processor.ProcessingDescriptor
    public boolean isSynchronous() {
        return this.processingStrategy.isSynchronous();
    }

    @Override // org.mule.runtime.core.api.construct.FlowConstruct
    public ProcessingStrategy getProcessingStrategy() {
        return this.processingStrategy;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.internal.construct.AbstractFlowConstruct
    public void doInitialise() throws MuleException {
        super.doInitialise();
        this.pipeline = createPipeline();
        if (this.source != null) {
            this.source.setListener(new Processor() { // from class: org.mule.runtime.core.internal.construct.AbstractPipeline.1
                @Override // org.mule.runtime.core.api.processor.Processor
                public CoreEvent process(CoreEvent coreEvent) throws MuleException {
                    return MessageProcessors.processToApply(coreEvent, this);
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.mule.runtime.core.api.processor.Processor, java.util.function.Function
                public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
                    return Flux.from(publisher).transform(AbstractPipeline.this.dispatchToFlow());
                }
            });
        }
        LifecycleUtils.initialiseIfNeeded(this.source, this.muleContext);
        LifecycleUtils.initialiseIfNeeded(this.pipeline, this.muleContext);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ReactiveProcessor dispatchToFlow() {
        return publisher -> {
            return Mono.from(publisher).doOnNext(assertStarted()).flatMap(this.source.getBackPressureStrategy() == MessageSource.BackPressureStrategy.WAIT ? flowWaitMapper(Functions.identity(), (coreEvent, coreEvent2) -> {
                return coreEvent;
            }) : flowFailDropMapper(Functions.identity(), (coreEvent3, coreEvent4) -> {
                return coreEvent3;
            }, this.overloadErrorType));
        };
    }

    protected abstract Function<? super CoreEvent, Mono<? extends CoreEvent>> flowWaitMapper(Function<CoreEvent, CoreEvent> function, BiFunction<CoreEvent, CoreEvent, CoreEvent> biFunction);

    protected abstract Function<? super CoreEvent, Mono<? extends CoreEvent>> flowFailDropMapper(Function<CoreEvent, CoreEvent> function, BiFunction<CoreEvent, CoreEvent, CoreEvent> biFunction, ErrorType errorType);

    protected ReactiveProcessor processFlowFunction() {
        return publisher -> {
            return Flux.from(publisher).doOnNext(beforeProcessors()).transform(this.processingStrategy.onPipeline(this.pipeline)).doOnNext(afterProcessors()).doOnError(th -> {
                if (isCompleteSignalRejectedExecutionException(th)) {
                    LOGGER.debug("Scheduler busy when propagating 'complete' signal due to graceful shutdown timeout being exceeded.", th);
                } else {
                    LOGGER.error("Unhandled exception in Flow ", th);
                }
            });
        };
    }

    boolean isCompleteSignalRejectedExecutionException(Throwable th) {
        if (!(th instanceof RejectedExecutionException)) {
            return false;
        }
        for (StackTraceElement stackTraceElement : th.getStackTrace()) {
            if (stackTraceElement.getMethodName().contains("onComplete") && stackTraceElement.getClassName().startsWith("reactor.core.publisher.FluxPublishOn")) {
                return true;
            }
        }
        return false;
    }

    private Consumer<CoreEvent> beforeProcessors() {
        return coreEvent -> {
            if (getStatistics().isEnabled()) {
                getStatistics().incReceivedEvents();
            }
            this.notificationFirer.dispatch(new PipelineMessageNotification(EnrichedNotificationInfo.createInfo(coreEvent, null, this), getName(), PipelineMessageNotification.PROCESS_START));
            long currentTimeMillis = System.currentTimeMillis();
            BaseEventContext baseEventContext = (BaseEventContext) coreEvent.getContext();
            baseEventContext.onComplete((coreEvent, th) -> {
                MessagingException messagingException = null;
                if (th != null) {
                    messagingException = th instanceof MessagingException ? (MessagingException) th : new MessagingException(coreEvent, th, this);
                }
                fireCompleteNotification(coreEvent, messagingException);
                baseEventContext.getProcessingTime().ifPresent(processingTime -> {
                    processingTime.addFlowExecutionBranchTime(currentTimeMillis);
                });
            });
        };
    }

    private void fireCompleteNotification(CoreEvent coreEvent, MessagingException messagingException) {
        this.notificationFirer.dispatch(new PipelineMessageNotification(EnrichedNotificationInfo.createInfo(coreEvent, messagingException, this), getName(), PipelineMessageNotification.PROCESS_COMPLETE));
    }

    private Consumer<CoreEvent> afterProcessors() {
        return coreEvent -> {
            this.notificationFirer.dispatch(new PipelineMessageNotification(EnrichedNotificationInfo.createInfo(coreEvent, null, this), getName(), PipelineMessageNotification.PROCESS_END));
            ((BaseEventContext) coreEvent.getContext()).success(coreEvent);
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void configureMessageProcessors(MessageProcessorChainBuilder messageProcessorChainBuilder) throws MuleException {
        for (Processor processor : getProcessors()) {
            if (processor instanceof Processor) {
                messageProcessorChainBuilder.chain(processor);
            } else {
                if (!(processor instanceof MessageProcessorBuilder)) {
                    throw new IllegalArgumentException("MessageProcessorBuilder should only have MessageProcessor's or MessageProcessorBuilder's configured");
                }
                messageProcessorChainBuilder.chain((MessageProcessorBuilder) processor);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.internal.construct.AbstractFlowConstruct
    public void doStartProcessingStrategy() throws MuleException {
        super.doStartProcessingStrategy();
        startIfStartable(this.processingStrategy);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.internal.construct.AbstractFlowConstruct
    public void doStart() throws MuleException {
        super.doStart();
        this.sink = this.processingStrategy.createSink(this, processFlowFunction());
        try {
            startIfStartable(this.pipeline);
            this.canProcessMessage = true;
            if (getMuleContext().isStarted()) {
                try {
                    if (this.componentInitialStateManager.mustStartMessageSource(this.source)) {
                        startIfStartable(this.source);
                    }
                } catch (ConnectException e) {
                    throw e;
                } catch (MuleException e2) {
                    doStop();
                    doStopProcessingStrategy();
                    throw e2;
                }
            }
        } catch (MuleException e3) {
            doStop();
            throw e3;
        }
    }

    public Consumer<CoreEvent> assertStarted() {
        return coreEvent -> {
            if (!this.canProcessMessage) {
                throw Exceptions.propagate(new MessagingException(coreEvent, new LifecycleException(CoreMessages.isStopped(getName()), coreEvent.getMessage())));
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.internal.construct.AbstractFlowConstruct
    public void doStop() throws MuleException {
        try {
            stopIfStoppable(this.source);
            disposeIfDisposable(this.sink);
            this.sink = null;
            stopIfStoppable(this.pipeline);
            super.doStop();
        } finally {
            this.canProcessMessage = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.internal.construct.AbstractFlowConstruct
    public void doStopProcessingStrategy() throws MuleException {
        stopIfStoppable(this.processingStrategy);
        super.doStopProcessingStrategy();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.internal.construct.AbstractFlowConstruct
    public void doDispose() {
        disposeIfDisposable(this.pipeline);
        disposeIfDisposable(this.source);
        super.doDispose();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Sink getSink() {
        return this.sink;
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public int getMaxConcurrency() {
        return this.maxConcurrency;
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public ProcessingStrategyFactory getProcessingStrategyFactory() {
        return this.processingStrategyFactory;
    }
}
