package org.mule.runtime.core.internal.construct;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Consumer;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.api.meta.AbstractAnnotatedObject;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.InternalEventContext;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.config.i18n.CoreMessages;
import org.mule.runtime.core.api.connector.ConnectException;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.context.notification.EnrichedNotificationInfo;
import org.mule.runtime.core.api.context.notification.NotificationDispatcher;
import org.mule.runtime.core.api.context.notification.PipelineMessageNotification;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.management.stats.FlowConstructStatistics;
import org.mule.runtime.core.api.processor.InternalProcessor;
import org.mule.runtime.core.api.processor.MessageProcessorBuilder;
import org.mule.runtime.core.api.processor.MessageProcessorChain;
import org.mule.runtime.core.api.processor.MessageProcessorChainBuilder;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.AsyncProcessingStrategyFactory;
import org.mule.runtime.core.api.processor.strategy.DirectProcessingStrategyFactory;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.registry.RegistrationException;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.util.MessagingExceptionResolver;
import org.mule.runtime.core.internal.util.rx.Operators;
import org.mule.runtime.core.privileged.processor.IdempotentRedeliveryPolicy;
import org.mule.runtime.core.privileged.processor.chain.DefaultMessageProcessorChainBuilder;
import org.reactivestreams.Publisher;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/mule/runtime/core/internal/construct/AbstractPipeline.class */
public abstract class AbstractPipeline extends AbstractFlowConstruct implements Pipeline {
    private final MessagingExceptionResolver exceptionResolver;
    private final NotificationDispatcher notificationFirer;
    private final MessageSource source;
    private final List<Processor> processors;
    private MessageProcessorChain pipeline;
    private final ProcessingStrategy processingStrategy;
    private volatile boolean canProcessMessage;
    private final Cache<String, InternalEventContext> eventContextCache;
    private volatile Sink sink;
    private final int maxConcurrency;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/construct/AbstractPipeline$ProcessEndProcessor.class */
    public class ProcessEndProcessor extends AbstractAnnotatedObject implements Processor, InternalProcessor {
        private ProcessEndProcessor() {
        }

        @Override // org.mule.runtime.core.api.processor.Processor
        public InternalEvent process(InternalEvent internalEvent) throws MuleException {
            AbstractPipeline.this.notificationFirer.dispatch(new PipelineMessageNotification(EnrichedNotificationInfo.createInfo(internalEvent, null, AbstractPipeline.this), AbstractPipeline.this, PipelineMessageNotification.PROCESS_END));
            return internalEvent;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/construct/AbstractPipeline$ProcessorStartCompleteProcessor.class */
    public class ProcessorStartCompleteProcessor implements Processor, InternalProcessor {
        private ProcessorStartCompleteProcessor() {
        }

        @Override // org.mule.runtime.core.api.processor.Processor
        public InternalEvent process(InternalEvent internalEvent) throws MuleException {
            AbstractPipeline.this.notificationFirer.dispatch(new PipelineMessageNotification(EnrichedNotificationInfo.createInfo(internalEvent, null, AbstractPipeline.this), AbstractPipeline.this, PipelineMessageNotification.PROCESS_START));
            long currentTimeMillis = System.currentTimeMillis();
            Mono.from(internalEvent.getContext().getBeforeResponsePublisher()).doOnSuccess(internalEvent2 -> {
                fireCompleteNotification(internalEvent2, null);
            }).doOnError(MessagingException.class, messagingException -> {
                fireCompleteNotification(null, messagingException);
            }).doOnError(th -> {
                return !(th instanceof MessagingException);
            }, th2 -> {
                fireCompleteNotification(null, new MessagingException(internalEvent, th2, AbstractPipeline.this));
            }).doOnTerminate((internalEvent3, th3) -> {
                internalEvent.getContext().getProcessingTime().ifPresent(processingTime -> {
                    processingTime.addFlowExecutionBranchTime(currentTimeMillis);
                });
            }).subscribe(Operators.requestUnbounded());
            return internalEvent;
        }

        private void fireCompleteNotification(InternalEvent internalEvent, MessagingException messagingException) {
            AbstractPipeline.this.notificationFirer.dispatch(new PipelineMessageNotification(EnrichedNotificationInfo.createInfo(internalEvent, messagingException, AbstractPipeline.this), AbstractPipeline.this, PipelineMessageNotification.PROCESS_COMPLETE));
        }
    }

    public AbstractPipeline(String str, MuleContext muleContext, MessageSource messageSource, List<Processor> list, Optional<MessagingExceptionHandler> optional, Optional<ProcessingStrategyFactory> optional2, String str2, int i, FlowConstructStatistics flowConstructStatistics) {
        super(str, muleContext, optional, str2, flowConstructStatistics);
        this.exceptionResolver = new MessagingExceptionResolver(this);
        this.canProcessMessage = false;
        this.eventContextCache = CacheBuilder.newBuilder().weakValues().build();
        try {
            this.notificationFirer = (NotificationDispatcher) muleContext.getRegistry().lookupObject(NotificationDispatcher.class);
            this.source = messageSource;
            this.processors = Collections.unmodifiableList(list);
            this.maxConcurrency = i;
            ProcessingStrategyFactory orElseGet = optional2.orElseGet(() -> {
                return defaultProcessingStrategy();
            });
            if (orElseGet instanceof AsyncProcessingStrategyFactory) {
                ((AsyncProcessingStrategyFactory) orElseGet).setMaxConcurrency(i);
            }
            this.processingStrategy = orElseGet.create(muleContext, getName());
        } catch (RegistrationException e) {
            throw new MuleRuntimeException(e);
        }
    }

    protected MessageProcessorChain createPipeline() throws MuleException {
        DefaultMessageProcessorChainBuilder defaultMessageProcessorChainBuilder = new DefaultMessageProcessorChainBuilder();
        defaultMessageProcessorChainBuilder.setName("'" + getName() + "' processor chain");
        if (this.processingStrategy != null) {
            defaultMessageProcessorChainBuilder.setProcessingStrategy(this.processingStrategy);
        }
        configurePreProcessors(defaultMessageProcessorChainBuilder);
        configureMessageProcessors(defaultMessageProcessorChainBuilder);
        configurePostProcessors(defaultMessageProcessorChainBuilder);
        return defaultMessageProcessorChainBuilder.build();
    }

    protected ProcessingStrategyFactory createDefaultProcessingStrategyFactory() {
        return new DirectProcessingStrategyFactory();
    }

    private ProcessingStrategyFactory defaultProcessingStrategy() {
        ProcessingStrategyFactory defaultProcessingStrategyFactory = getMuleContext().getConfiguration().getDefaultProcessingStrategyFactory();
        return defaultProcessingStrategyFactory == null ? createDefaultProcessingStrategyFactory() : defaultProcessingStrategyFactory;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void configurePreProcessors(MessageProcessorChainBuilder messageProcessorChainBuilder) throws MuleException {
        messageProcessorChainBuilder.chain(new ProcessorStartCompleteProcessor());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void configurePostProcessors(MessageProcessorChainBuilder messageProcessorChainBuilder) throws MuleException {
        messageProcessorChainBuilder.chain(new ProcessEndProcessor());
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public List<Processor> getProcessors() {
        return this.processors;
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public MessageSource getSource() {
        return this.source;
    }

    protected MessageProcessorChain getPipeline() {
        return this.pipeline;
    }

    @Override // org.mule.runtime.core.api.processor.ProcessingDescriptor
    public boolean isSynchronous() {
        return this.processingStrategy.isSynchronous();
    }

    @Override // org.mule.runtime.core.api.construct.FlowConstruct
    public ProcessingStrategy getProcessingStrategy() {
        return this.processingStrategy;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.internal.construct.AbstractFlowConstruct
    public void doInitialise() throws MuleException {
        super.doInitialise();
        this.pipeline = createPipeline();
        if (this.source != null) {
            this.source.setListener(new Processor() { // from class: org.mule.runtime.core.internal.construct.AbstractPipeline.1
                @Override // org.mule.runtime.core.api.processor.Processor
                public InternalEvent process(InternalEvent internalEvent) throws MuleException {
                    return MessageProcessors.processToApply(internalEvent, this);
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // org.mule.runtime.core.api.processor.Processor, java.util.function.Function
                public Publisher<InternalEvent> apply(Publisher<InternalEvent> publisher) {
                    return Flux.from(publisher).doOnNext(AbstractPipeline.this.assertStarted()).handle((internalEvent, synchronousSink) -> {
                        try {
                            AbstractPipeline.this.getSink().accept(internalEvent);
                            synchronousSink.next(internalEvent);
                        } catch (RejectedExecutionException e) {
                            internalEvent.getContext().error(AbstractPipeline.this.exceptionResolver.resolve(new MessagingException(internalEvent, e, AbstractPipeline.this), AbstractPipeline.this.getMuleContext()));
                        }
                    }).flatMap(internalEvent2 -> {
                        return Mono.from(internalEvent2.getContext().getResponsePublisher());
                    });
                }
            });
        }
        injectFlowConstructMuleContext(this.source);
        injectFlowConstructMuleContext(this.pipeline);
        initialiseIfInitialisable(this.source);
        initialiseIfInitialisable(this.pipeline);
    }

    protected ReactiveProcessor processFlowFunction() {
        return publisher -> {
            return Flux.from(publisher).transform(this.processingStrategy.onPipeline(this.pipeline)).doOnNext(internalEvent -> {
                internalEvent.getContext().success(internalEvent);
            }).doOnError(th -> {
                LOGGER.error("Unhandled exception in Flow ", th);
            });
        };
    }

    protected void configureMessageProcessors(MessageProcessorChainBuilder messageProcessorChainBuilder) throws MuleException {
        for (Processor processor : getProcessors()) {
            if (processor instanceof Processor) {
                messageProcessorChainBuilder.chain(processor);
            } else {
                if (!(processor instanceof MessageProcessorBuilder)) {
                    throw new IllegalArgumentException("MessageProcessorBuilder should only have MessageProcessor's or MessageProcessorBuilder's configured");
                }
                messageProcessorChainBuilder.chain((MessageProcessorBuilder) processor);
            }
        }
    }

    protected boolean isRedeliveryPolicyConfigured() {
        if (getProcessors().isEmpty()) {
            return false;
        }
        return getProcessors().get(0) instanceof IdempotentRedeliveryPolicy;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.internal.construct.AbstractFlowConstruct
    public void doStart() throws MuleException {
        super.doStart();
        startIfStartable(this.processingStrategy);
        this.sink = this.processingStrategy.createSink(this, processFlowFunction());
        try {
            startIfStartable(this.pipeline);
            this.canProcessMessage = true;
            if (getMuleContext().isStarted()) {
                try {
                    startIfStartable(this.source);
                } catch (ConnectException e) {
                    throw e;
                } catch (MuleException e2) {
                    doStop();
                    throw e2;
                }
            }
        } catch (MuleException e3) {
            doStop();
            throw e3;
        }
    }

    public Consumer<InternalEvent> assertStarted() {
        return internalEvent -> {
            if (!this.canProcessMessage) {
                throw Exceptions.propagate(new MessagingException(internalEvent, new LifecycleException(CoreMessages.isStopped(getName()), internalEvent.getMessage())));
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.internal.construct.AbstractFlowConstruct
    public void doStop() throws MuleException {
        try {
            stopIfStoppable(this.source);
            disposeIfDisposable(this.sink);
            this.sink = null;
            stopIfStoppable(this.processingStrategy);
            stopIfStoppable(this.pipeline);
            super.doStop();
        } finally {
            this.canProcessMessage = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.internal.construct.AbstractFlowConstruct
    public void doDispose() {
        disposeIfDisposable(this.pipeline);
        disposeIfDisposable(this.source);
        super.doDispose();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Sink getSink() {
        return this.sink;
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public Map<String, InternalEventContext> getSerializationEventContextCache() {
        return this.eventContextCache.asMap();
    }

    @Override // org.mule.runtime.core.api.construct.Pipeline
    public int getMaxConcurrency() {
        return this.maxConcurrency;
    }
}
