package org.mule.runtime.core.construct;

import java.util.Collections;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.Predicate;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.GlobalNameableObject;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstructInvalidException;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.DefaultMessageProcessorPathElement;
import org.mule.runtime.core.api.processor.InternalMessageProcessor;
import org.mule.runtime.core.api.processor.MessageProcessorBuilder;
import org.mule.runtime.core.api.processor.MessageProcessorChain;
import org.mule.runtime.core.api.processor.MessageProcessorChainBuilder;
import org.mule.runtime.core.api.processor.MessageProcessorContainer;
import org.mule.runtime.core.api.processor.MessageProcessorPathElement;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.source.ClusterizableMessageSource;
import org.mule.runtime.core.api.source.CompositeMessageSource;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.source.NonBlockingMessageSource;
import org.mule.runtime.core.api.transport.LegacyInboundEndpoint;
import org.mule.runtime.core.config.i18n.CoreMessages;
import org.mule.runtime.core.connector.ConnectException;
import org.mule.runtime.core.context.notification.PipelineMessageNotification;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.processor.AbstractFilteringMessageProcessor;
import org.mule.runtime.core.processor.AbstractInterceptingMessageProcessor;
import org.mule.runtime.core.processor.AbstractRequestResponseMessageProcessor;
import org.mule.runtime.core.processor.IdempotentRedeliveryPolicy;
import org.mule.runtime.core.processor.chain.DefaultMessageProcessorChainBuilder;
import org.mule.runtime.core.processor.strategy.DefaultFlowProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.LegacyAsynchronousProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.LegacyNonBlockingProcessingStrategyFactory;
import org.mule.runtime.core.processor.strategy.SynchronousProcessingStrategyFactory;
import org.mule.runtime.core.source.ClusterizableMessageSourceWrapper;
import org.mule.runtime.core.transaction.TransactionCoordination;
import org.mule.runtime.core.util.NotificationUtils;
import org.mule.runtime.core.util.rx.Exceptions;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/mule/runtime/core/construct/AbstractPipeline.class */
public abstract class AbstractPipeline extends AbstractFlowConstruct implements Pipeline {
    protected MessageSource messageSource;
    protected MessageProcessorChain pipeline;
    protected final SchedulerService schedulerService;
    protected List<Processor> messageProcessors;
    private NotificationUtils.PathResolver flowMap;
    protected ProcessingStrategyFactory processingStrategyFactory;
    protected ProcessingStrategy processingStrategy;
    private boolean canProcessMessage;
    private static final Predicate sourceCompatibleWithAsync = new Predicate() { // from class: org.mule.runtime.core.construct.AbstractPipeline.1
        public boolean evaluate(Object obj) {
            if (obj instanceof LegacyInboundEndpoint) {
                return ((LegacyInboundEndpoint) obj).isCompatibleWithAsync();
            }
            if (obj instanceof CompositeMessageSource) {
                return CollectionUtils.selectRejected(((CompositeMessageSource) obj).getSources(), this).isEmpty();
            }
            return true;
        }
    };

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/construct/AbstractPipeline$ProcessEndProcessor.class */
    public class ProcessEndProcessor implements Processor, InternalMessageProcessor {
        private ProcessEndProcessor() {
        }

        @Override // org.mule.runtime.core.api.processor.Processor
        public Event process(Event event) throws MuleException {
            AbstractPipeline.this.muleContext.getNotificationManager().fireNotification(new PipelineMessageNotification(AbstractPipeline.this, event, PipelineMessageNotification.PROCESS_END));
            return event;
        }
    }

    /* loaded from: input_file:org/mule/runtime/core/construct/AbstractPipeline$ProcessIfPipelineStartedMessageProcessor.class */
    public class ProcessIfPipelineStartedMessageProcessor extends AbstractFilteringMessageProcessor implements InternalMessageProcessor {
        public ProcessIfPipelineStartedMessageProcessor() {
        }

        @Override // org.mule.runtime.core.processor.AbstractFilteringMessageProcessor
        protected boolean accept(Event event, Event.Builder builder) {
            return AbstractPipeline.this.canProcessMessage;
        }

        @Override // org.mule.runtime.core.processor.AbstractFilteringMessageProcessor
        public boolean isThrowOnUnaccepted() {
            return true;
        }

        @Override // org.mule.runtime.core.processor.AbstractFilteringMessageProcessor
        protected MuleException filterUnacceptedException(Event event) {
            return new LifecycleException(CoreMessages.isStopped(AbstractPipeline.this.getName()), event.mo7getMessage());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/construct/AbstractPipeline$ProcessingStrategyInterceptingMessageProcessor.class */
    public class ProcessingStrategyInterceptingMessageProcessor extends AbstractInterceptingMessageProcessor implements InternalMessageProcessor {
        private ProcessingStrategyInterceptingMessageProcessor() {
        }

        @Override // org.mule.runtime.core.api.processor.Processor
        public Event process(Event event) throws MuleException {
            if (AbstractPipeline.this.processingStrategy == SynchronousProcessingStrategyFactory.SYNCHRONOUS_PROCESSING_STRATEGY_INSTANCE) {
                return processNext(event);
            }
            try {
                return (Event) Mono.just(event).transform(this).block();
            } catch (Throwable th) {
                throw Exceptions.rxExceptionToMuleException(th);
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.mule.runtime.core.api.processor.Processor, java.util.function.Function
        public Publisher<Event> apply(Publisher<Event> publisher) {
            return Flux.from(publisher).transform(AbstractPipeline.this.processingStrategy.onPipeline(AbstractPipeline.this, this.next));
        }

        @Override // org.mule.runtime.core.api.processor.ReactiveProcessor
        public ReactiveProcessor.ProcessingType getProccesingType() {
            return ReactiveProcessor.ProcessingType.CPU_LITE;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/construct/AbstractPipeline$ProcessorStartCompleteProcessor.class */
    public class ProcessorStartCompleteProcessor extends AbstractRequestResponseMessageProcessor implements InternalMessageProcessor {
        private ProcessorStartCompleteProcessor() {
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.mule.runtime.core.processor.AbstractRequestResponseMessageProcessor
        public Event processRequest(Event event) throws MuleException {
            this.muleContext.getNotificationManager().fireNotification(new PipelineMessageNotification(AbstractPipeline.this, event, PipelineMessageNotification.PROCESS_START));
            return super.processRequest(event);
        }

        @Override // org.mule.runtime.core.processor.AbstractRequestResponseMessageProcessor
        protected void processFinally(Event event, MessagingException messagingException) {
            this.muleContext.getNotificationManager().fireNotification(new PipelineMessageNotification(AbstractPipeline.this, event, PipelineMessageNotification.PROCESS_COMPLETE, messagingException));
        }
    }

    public AbstractPipeline(String str, MuleContext muleContext) {
        super(str, muleContext);
        this.messageProcessors = Collections.emptyList();
        this.canProcessMessage = false;
        this.schedulerService = muleContext.getSchedulerService();
        initialiseProcessingStrategy();
    }

    protected MessageProcessorChain createPipeline() throws MuleException {
        DefaultMessageProcessorChainBuilder defaultMessageProcessorChainBuilder = new DefaultMessageProcessorChainBuilder();
        defaultMessageProcessorChainBuilder.setName("'" + getName() + "' processor chain");
        configurePreProcessors(defaultMessageProcessorChainBuilder);
        configureMessageProcessors(defaultMessageProcessorChainBuilder);
        configurePostProcessors(defaultMessageProcessorChainBuilder);
        return defaultMessageProcessorChainBuilder.build();
    }

    protected ProcessingStrategyFactory createDefaultProcessingStrategyFactory() {
        return new SynchronousProcessingStrategyFactory();
    }

    private void initialiseProcessingStrategy() {
        if (this.processingStrategy == null) {
            if (this.processingStrategyFactory == null) {
                ProcessingStrategyFactory defaultProcessingStrategyFactory = this.muleContext.getConfiguration().getDefaultProcessingStrategyFactory();
                if (defaultProcessingStrategyFactory == null) {
                    this.processingStrategyFactory = createDefaultProcessingStrategyFactory();
                } else {
                    this.processingStrategyFactory = defaultProcessingStrategyFactory;
                }
            }
            this.processingStrategy = this.processingStrategyFactory.create(this.muleContext);
        }
        boolean z = !(getProcessingStrategyFactory() instanceof DefaultFlowProcessingStrategyFactory);
        boolean isRedeliveryPolicyConfigured = isRedeliveryPolicyConfigured();
        if (z || !isRedeliveryPolicyConfigured) {
            return;
        }
        this.processingStrategy = new SynchronousProcessingStrategyFactory().create(this.muleContext);
        if (LOGGER.isWarnEnabled()) {
            LOGGER.warn("Using message redelivery and on-error-propagate requires synchronous processing strategy. Processing strategy re-configured to synchronous");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void configurePreProcessors(MessageProcessorChainBuilder messageProcessorChainBuilder) throws MuleException {
        messageProcessorChainBuilder.chain(new ProcessorStartCompleteProcessor());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void configurePostProcessors(MessageProcessorChainBuilder messageProcessorChainBuilder) throws MuleException {
        messageProcessorChainBuilder.chain(new ProcessEndProcessor());
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public void setMessageProcessors(List<Processor> list) {
        this.messageProcessors = list;
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public List<Processor> getMessageProcessors() {
        return this.messageProcessors;
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public MessageSource getMessageSource() {
        return this.messageSource;
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public void setMessageSource(MessageSource messageSource) {
        if (messageSource instanceof ClusterizableMessageSource) {
            this.messageSource = new ClusterizableMessageSourceWrapper(this.muleContext, (ClusterizableMessageSource) messageSource, this);
        } else {
            this.messageSource = messageSource;
        }
    }

    @Override // org.mule.runtime.core.api.processor.ProcessingDescriptor
    public boolean isSynchronous() {
        return this.processingStrategy.isSynchronous();
    }

    public ProcessingStrategyFactory getProcessingStrategyFactory() {
        return this.processingStrategyFactory;
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public void setProcessingStrategyFactory(ProcessingStrategyFactory processingStrategyFactory) {
        this.processingStrategyFactory = processingStrategyFactory;
        this.processingStrategy = null;
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public ProcessingStrategy getProcessingStrategy() {
        return this.processingStrategy;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.construct.AbstractFlowConstruct
    public void doInitialise() throws MuleException {
        super.doInitialise();
        initialiseProcessingStrategy();
        this.pipeline = createPipeline();
        if (this.messageSource != null) {
            this.messageSource.setListener(new AbstractInterceptingMessageProcessor() { // from class: org.mule.runtime.core.construct.AbstractPipeline.2
                @Override // org.mule.runtime.core.api.processor.Processor
                public Event process(Event event) throws MuleException {
                    if (TransactionCoordination.isTransactionActive()) {
                        return AbstractPipeline.this.pipeline.process(event);
                    }
                    try {
                        return (Event) Mono.just(event).transform(this).block();
                    } catch (Exception e) {
                        throw Exceptions.rxExceptionToMuleException(e);
                    }
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.mule.runtime.core.api.processor.Processor, java.util.function.Function
                public Publisher<Event> apply(Publisher<Event> publisher) {
                    return Flux.from(publisher).transform(AbstractPipeline.this.pipeline);
                }
            });
        }
        injectFlowConstructMuleContext(this.messageSource);
        injectExceptionHandler(this.messageSource);
        injectFlowConstructMuleContext(this.pipeline);
        injectExceptionHandler(this.pipeline);
        initialiseIfInitialisable(this.messageSource);
        initialiseIfInitialisable(this.pipeline);
        createFlowMap();
    }

    protected void configureMessageProcessors(MessageProcessorChainBuilder messageProcessorChainBuilder) throws MuleException {
        messageProcessorChainBuilder.chain(new ProcessingStrategyInterceptingMessageProcessor());
        for (Processor processor : getMessageProcessors()) {
            if (processor instanceof Processor) {
                messageProcessorChainBuilder.chain(processor);
            } else {
                if (!(processor instanceof MessageProcessorBuilder)) {
                    throw new IllegalArgumentException("MessageProcessorBuilder should only have MessageProcessor's or MessageProcessorBuilder's configured");
                }
                messageProcessorChainBuilder.chain((MessageProcessorBuilder) processor);
            }
        }
    }

    @Override // org.mule.runtime.core.construct.AbstractFlowConstruct
    public void validateConstruct() throws FlowConstructInvalidException {
        super.validateConstruct();
        boolean z = (getProcessingStrategyFactory() instanceof LegacyAsynchronousProcessingStrategyFactory) && (!(getProcessingStrategyFactory() instanceof DefaultFlowProcessingStrategyFactory));
        boolean evaluate = sourceCompatibleWithAsync.evaluate(this.messageSource);
        if (z && this.messageSource != null && !evaluate) {
            throw new FlowConstructInvalidException(CoreMessages.createStaticMessage("One of the message sources configured on this Flow is not compatible with an asynchronous processing strategy.  Either because it is request-response, has a transaction defined, or messaging redelivered is configured."), this);
        }
        if ((getProcessingStrategyFactory() instanceof LegacyNonBlockingProcessingStrategyFactory) && this.messageSource != null && !(this.messageSource instanceof NonBlockingMessageSource)) {
            throw new FlowConstructInvalidException(CoreMessages.createStaticMessage(String.format("The non-blocking processing strategy (%s) currently only supports non-blocking messages sources (source is %s).", getProcessingStrategyFactory().toString(), this.messageSource.toString())), this);
        }
    }

    protected boolean isRedeliveryPolicyConfigured() {
        if (getMessageProcessors().isEmpty()) {
            return false;
        }
        return getMessageProcessors().get(0) instanceof IdempotentRedeliveryPolicy;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.construct.AbstractFlowConstruct
    public void doStart() throws MuleException {
        super.doStart();
        startIfStartable(this.processingStrategy);
        startIfStartable(this.pipeline);
        LifecycleUtils.startIfNeeded(this.processingStrategy);
        this.canProcessMessage = true;
        if (this.muleContext.isStarted()) {
            try {
                startIfStartable(this.messageSource);
            } catch (MuleException e) {
                doStop();
                throw e;
            } catch (ConnectException e2) {
                throw e2;
            }
        }
    }

    private void createFlowMap() {
        DefaultMessageProcessorPathElement defaultMessageProcessorPathElement = new DefaultMessageProcessorPathElement(null, getName());
        addMessageProcessorPathElements(defaultMessageProcessorPathElement);
        this.flowMap = NotificationUtils.buildPathResolver(defaultMessageProcessorPathElement);
    }

    @Override // org.mule.runtime.core.api.processor.MessageProcessorContainer
    public void addMessageProcessorPathElements(MessageProcessorPathElement messageProcessorPathElement) {
        NotificationUtils.addMessageProcessorPathElements(this.pipeline, messageProcessorPathElement.addChild("processors"));
        if (this.exceptionListener instanceof MessageProcessorContainer) {
            String exceptionStrategyGlobalName = getExceptionStrategyGlobalName();
            MessageProcessorPathElement messageProcessorPathElement2 = messageProcessorPathElement;
            if (exceptionStrategyGlobalName != null) {
                messageProcessorPathElement2 = messageProcessorPathElement2.addChild(exceptionStrategyGlobalName);
            }
            ((MessageProcessorContainer) this.exceptionListener).addMessageProcessorPathElements(messageProcessorPathElement2.addChild("es"));
        }
    }

    private String getExceptionStrategyGlobalName() {
        String str = null;
        if (this.exceptionListener instanceof GlobalNameableObject) {
            str = ((GlobalNameableObject) this.exceptionListener).getGlobalName();
        }
        return str;
    }

    @Override // org.mule.runtime.core.api.construct.MessageProcessorPathResolver
    public String getProcessorPath(Processor processor) {
        return this.flowMap.resolvePath(processor);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.construct.AbstractFlowConstruct
    public void doStop() throws MuleException {
        try {
            stopIfStoppable(this.messageSource);
            stopIfStoppable(this.processingStrategy);
            stopIfStoppable(this.pipeline);
            super.doStop();
        } finally {
            this.canProcessMessage = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.construct.AbstractFlowConstruct
    public void doDispose() {
        disposeIfDisposable(this.pipeline);
        disposeIfDisposable(this.messageSource);
        super.doDispose();
    }
}
