package org.mule.runtime.core.privileged.processor.chain;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.inject.Inject;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.notification.MessageProcessorNotification;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.context.notification.ServerNotificationManager;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.streaming.StreamingManager;
import org.mule.runtime.core.api.util.StreamingUtils;
import org.mule.runtime.core.api.util.StringUtils;
import org.mule.runtime.core.internal.context.DefaultMuleContext;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.interception.ProcessorInterceptorManager;
import org.mule.runtime.core.internal.processor.interceptor.ReactiveAroundInterceptorAdapter;
import org.mule.runtime.core.internal.processor.interceptor.ReactiveInterceptorAdapter;
import org.mule.runtime.core.internal.util.MessagingExceptionResolver;
import org.mule.runtime.core.privileged.component.AbstractExecutableComponent;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.event.PrivilegedEvent;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.context.Context;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/mule/runtime/core/privileged/processor/chain/AbstractMessageProcessorChain.class */
public abstract class AbstractMessageProcessorChain extends AbstractExecutableComponent implements MessageProcessorChain {
    private static final String TCCL_REACTOR_CTX_KEY = "mule.context.tccl";
    private static final String TCCL_ORIGINAL_REACTOR_CTX_KEY = "mule.context.tccl_original";
    private static Class<ClassLoader> appClClass;
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractMessageProcessorChain.class);
    private final String name;
    private final List<Processor> processors;
    private ProcessingStrategy processingStrategy;
    private List<ReactiveInterceptorAdapter> additionalInterceptors = new LinkedList();

    @Inject
    private ProcessorInterceptorManager processorInterceptorManager;

    @Inject
    private StreamingManager streamingManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractMessageProcessorChain(String str, Optional<ProcessingStrategy> optional, List<Processor> list) {
        this.name = str;
        this.processingStrategy = optional.orElse(null);
        this.processors = list;
    }

    public CoreEvent process(CoreEvent coreEvent) throws MuleException {
        return MessageProcessors.processToApply(coreEvent, this);
    }

    @Override // org.mule.runtime.core.api.processor.Processor, java.util.function.Function
    public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
        List<BiFunction<Processor, ReactiveProcessor, ReactiveProcessor>> resolveInterceptors = resolveInterceptors();
        Flux from = Flux.from(publisher);
        Iterator<Processor> it = getProcessorsToExecute().iterator();
        while (it.hasNext()) {
            from = from.transform(applyInterceptors(resolveInterceptors, it.next()));
        }
        return from.subscriberContext(context -> {
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            return (contextClassLoader == null || contextClassLoader.getParent() == null || appClClass == null || !appClClass.isAssignableFrom(contextClassLoader.getClass())) ? context : context.put(TCCL_ORIGINAL_REACTOR_CTX_KEY, contextClassLoader).put(TCCL_REACTOR_CTX_KEY, contextClassLoader.getParent());
        });
    }

    private ReactiveProcessor applyInterceptors(List<BiFunction<Processor, ReactiveProcessor, ReactiveProcessor>> list, Processor processor) {
        ReactiveProcessor reactiveProcessor = processor;
        Iterator<BiFunction<Processor, ReactiveProcessor, ReactiveProcessor>> it = list.iterator();
        while (it.hasNext()) {
            reactiveProcessor = it.next().apply(processor, reactiveProcessor);
        }
        return reactiveProcessor;
    }

    private List<BiFunction<Processor, ReactiveProcessor, ReactiveProcessor>> resolveInterceptors() {
        ArrayList arrayList = new ArrayList();
        arrayList.add((processor, reactiveProcessor) -> {
            return publisher -> {
                return Flux.from(publisher).transform(doOnNextOrErrorWithContext(context -> {
                    context.getOrEmpty(TCCL_REACTOR_CTX_KEY).ifPresent(obj -> {
                        Thread.currentThread().setContextClassLoader((ClassLoader) obj);
                    });
                })).transform(reactiveProcessor).transform(doOnNextOrErrorWithContext(context2 -> {
                    context2.getOrEmpty(TCCL_ORIGINAL_REACTOR_CTX_KEY).ifPresent(obj -> {
                        Thread.currentThread().setContextClassLoader((ClassLoader) obj);
                    });
                }));
            };
        });
        arrayList.add((processor2, reactiveProcessor2) -> {
            return publisher -> {
                return Flux.from(publisher).concatMap(coreEvent -> {
                    return Flux.just(coreEvent).transform(reactiveProcessor2).onErrorMap(MessagingException.class, resolveMessagingException(processor2)).onErrorMap(th -> {
                        return !(th instanceof MessagingException);
                    }, th2 -> {
                        return resolveException((Component) processor2, coreEvent, th2);
                    });
                });
            };
        });
        arrayList.add((processor3, reactiveProcessor3) -> {
            return publisher -> {
                return Flux.from(publisher).cast(PrivilegedEvent.class).doOnNext(privilegedEvent -> {
                    DefaultMuleContext.currentMuleContext.set(this.muleContext);
                }).doOnNext(privilegedEvent2 -> {
                    PrivilegedEvent.setCurrentEvent(privilegedEvent2);
                }).cast(CoreEvent.class).transform(reactiveProcessor3);
            };
        });
        if (this.processingStrategy != null) {
            arrayList.add((processor4, reactiveProcessor4) -> {
                return this.processingStrategy.onProcessor(new ReactiveProcessor() { // from class: org.mule.runtime.core.privileged.processor.chain.AbstractMessageProcessorChain.1
                    @Override // java.util.function.Function
                    public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
                        return reactiveProcessor4.apply(publisher);
                    }

                    @Override // org.mule.runtime.core.api.processor.ReactiveProcessor
                    public ReactiveProcessor.ProcessingType getProcessingType() {
                        return processor4.getProcessingType();
                    }
                });
            });
        }
        arrayList.add((processor5, reactiveProcessor5) -> {
            return publisher -> {
                return Flux.from(publisher).transform(reactiveProcessor5).cast(PrivilegedEvent.class).doOnNext(privilegedEvent -> {
                    PrivilegedEvent.setCurrentEvent(privilegedEvent);
                }).cast(CoreEvent.class);
            };
        });
        arrayList.add((processor6, reactiveProcessor6) -> {
            return publisher -> {
                return Flux.from(publisher).cast(PrivilegedEvent.class).doOnNext(preNotification(processor6)).cast(CoreEvent.class).transform(reactiveProcessor6).cast(PrivilegedEvent.class).doOnNext(postNotification(processor6)).doOnError(MessagingException.class, errorNotification(processor6)).cast(CoreEvent.class);
            };
        });
        arrayList.add((processor7, reactiveProcessor7) -> {
            return publisher -> {
                return Flux.from(publisher).transform(reactiveProcessor7).map(StreamingUtils.updateEventForStreaming(this.streamingManager));
            };
        });
        arrayList.addAll(0, this.additionalInterceptors);
        arrayList.add((processor8, reactiveProcessor8) -> {
            return publisher -> {
                return Flux.from(publisher).concatMap(coreEvent -> {
                    return Flux.just(coreEvent).transform(reactiveProcessor8).doOnEach(signal -> {
                        DefaultMuleContext.currentMuleContext.set(null);
                    }).onErrorResume(RejectedExecutionException.class, rejectedExecutionException -> {
                        return Mono.from(((BaseEventContext) coreEvent.getContext()).error(resolveException((Component) processor8, coreEvent, rejectedExecutionException))).then(Mono.empty());
                    }).onErrorResume(MessagingException.class, messagingException -> {
                        return Mono.from(((BaseEventContext) coreEvent.getContext()).error(resolveMessagingException(processor8).apply(messagingException))).then(Mono.empty());
                    }).onErrorResume(th -> {
                        return Mono.from(((BaseEventContext) coreEvent.getContext()).error(resolveException((Component) processor8, coreEvent, th))).then(Mono.empty());
                    });
                });
            };
        });
        return arrayList;
    }

    private Function<? super Publisher<CoreEvent>, ? extends Publisher<CoreEvent>> doOnNextOrErrorWithContext(Consumer<Context> consumer) {
        return Operators.lift((scannable, coreSubscriber) -> {
            return new CoreSubscriber<CoreEvent>() { // from class: org.mule.runtime.core.privileged.processor.chain.AbstractMessageProcessorChain.2
                public void onNext(CoreEvent coreEvent) {
                    consumer.accept(coreSubscriber.currentContext());
                    coreSubscriber.onNext(coreEvent);
                }

                public void onError(Throwable th) {
                    consumer.accept(coreSubscriber.currentContext());
                    coreSubscriber.onError(th);
                }

                public void onComplete() {
                    coreSubscriber.onComplete();
                }

                public Context currentContext() {
                    return coreSubscriber.currentContext();
                }

                public void onSubscribe(Subscription subscription) {
                    coreSubscriber.onSubscribe(subscription);
                }
            };
        });
    }

    private MessagingException resolveException(Component component, CoreEvent coreEvent, Throwable th) {
        return new MessagingExceptionResolver(component).resolve(new MessagingException(coreEvent, th, component), this.muleContext);
    }

    private Function<MessagingException, MessagingException> resolveMessagingException(Processor processor) {
        if (!(processor instanceof Component)) {
            return messagingException -> {
                return messagingException;
            };
        }
        MessagingExceptionResolver messagingExceptionResolver = new MessagingExceptionResolver((Component) processor);
        return messagingException2 -> {
            return messagingExceptionResolver.resolve(messagingException2, this.muleContext);
        };
    }

    private Consumer<PrivilegedEvent> preNotification(Processor processor) {
        return privilegedEvent -> {
            if (privilegedEvent.isNotificationsEnabled()) {
                fireNotification(this.muleContext.getNotificationManager(), privilegedEvent, processor, null, MessageProcessorNotification.MESSAGE_PROCESSOR_PRE_INVOKE);
            }
        };
    }

    private Consumer<PrivilegedEvent> postNotification(Processor processor) {
        return privilegedEvent -> {
            if (privilegedEvent.isNotificationsEnabled()) {
                fireNotification(this.muleContext.getNotificationManager(), privilegedEvent, processor, null, MessageProcessorNotification.MESSAGE_PROCESSOR_POST_INVOKE);
            }
        };
    }

    private Consumer<Exception> errorNotification(Processor processor) {
        return exc -> {
            if ((exc instanceof MessagingException) && ((PrivilegedEvent) ((MessagingException) exc).getEvent()).isNotificationsEnabled()) {
                fireNotification(this.muleContext.getNotificationManager(), ((MessagingException) exc).getEvent(), processor, (MessagingException) exc, MessageProcessorNotification.MESSAGE_PROCESSOR_POST_INVOKE);
            }
        };
    }

    private void fireNotification(ServerNotificationManager serverNotificationManager, CoreEvent coreEvent, Processor processor, MessagingException messagingException, int i) {
        if (serverNotificationManager == null || !serverNotificationManager.isNotificationEnabled(MessageProcessorNotification.class) || ((Component) processor).getLocation() == null) {
            return;
        }
        serverNotificationManager.fireNotification(MessageProcessorNotification.createFrom(coreEvent, ((Component) processor).getLocation(), (Component) processor, messagingException, i));
    }

    protected List<Processor> getProcessorsToExecute() {
        return this.processors;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(getClass().getSimpleName());
        if (!StringUtils.isBlank(this.name)) {
            sb.append(String.format(" '%s' ", this.name));
        }
        Iterator<Processor> it = this.processors.iterator();
        String format = String.format("%n", new Object[0]);
        if (it.hasNext()) {
            sb.append(String.format("%n[ ", new Object[0]));
            while (it.hasNext()) {
                sb.append(String.format("%n  %s", org.apache.commons.lang3.StringUtils.replace(it.next().toString(), format, String.format("%n  ", new Object[0]))));
                if (it.hasNext()) {
                    sb.append(", ");
                }
            }
            sb.append(String.format("%n]", new Object[0]));
        }
        return sb.toString();
    }

    public List<Processor> getMessageProcessors() {
        return this.processors;
    }

    protected List<Processor> getMessageProcessorsForLifecycle() {
        return this.processors;
    }

    @Override // org.mule.runtime.core.privileged.component.AbstractExecutableComponent, org.mule.runtime.core.api.context.MuleContextAware
    public void setMuleContext(MuleContext muleContext) {
        super.setMuleContext(muleContext);
        LifecycleUtils.setMuleContextIfNeeded((Collection<? extends Object>) getMessageProcessorsForLifecycle(), muleContext);
    }

    public void initialise() throws InitialisationException {
        this.processorInterceptorManager.getInterceptorFactories().stream().forEach(processorInterceptorFactory -> {
            ReactiveInterceptorAdapter reactiveInterceptorAdapter = new ReactiveInterceptorAdapter(processorInterceptorFactory);
            try {
                this.muleContext.getInjector().inject(reactiveInterceptorAdapter);
                this.additionalInterceptors.add(0, reactiveInterceptorAdapter);
            } catch (MuleException e) {
                throw new MuleRuntimeException(e);
            }
        });
        this.processorInterceptorManager.getInterceptorFactories().stream().forEach(processorInterceptorFactory2 -> {
            ReactiveAroundInterceptorAdapter reactiveAroundInterceptorAdapter = new ReactiveAroundInterceptorAdapter(processorInterceptorFactory2);
            try {
                this.muleContext.getInjector().inject(reactiveAroundInterceptorAdapter);
                this.additionalInterceptors.add(0, reactiveAroundInterceptorAdapter);
            } catch (MuleException e) {
                throw new MuleRuntimeException(e);
            }
        });
        LifecycleUtils.initialiseIfNeeded((Collection<? extends Object>) getMessageProcessorsForLifecycle(), this.muleContext);
    }

    public void start() throws MuleException {
        ArrayList arrayList = new ArrayList();
        try {
            for (Processor processor : getMessageProcessorsForLifecycle()) {
                if (processor instanceof Startable) {
                    ((Startable) processor).start();
                    arrayList.add(processor);
                }
            }
        } catch (MuleException e) {
            LifecycleUtils.stopIfNeeded((Collection<? extends Object>) getMessageProcessorsForLifecycle());
            throw e;
        }
    }

    public void stop() throws MuleException {
        LifecycleUtils.stopIfNeeded((Collection<? extends Object>) getMessageProcessorsForLifecycle());
    }

    public void dispose() {
        LifecycleUtils.disposeIfNeeded((Collection<? extends Object>) getMessageProcessorsForLifecycle(), LOGGER);
    }

    static {
        try {
            appClClass = AbstractMessageProcessorChain.class.getClassLoader().loadClass("org.mule.runtime.deployment.model.api.application.ApplicationClassLoader");
        } catch (ClassNotFoundException e) {
            LOGGER.debug("ApplicationClassLoader interface not avialable in current context", e);
        }
    }
}
