package org.mule.runtime.core.internal.processor;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.inject.Inject;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.notification.AsyncMessageNotification;
import org.mule.runtime.api.notification.EnrichedNotificationInfo;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.util.LazyValue;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.config.i18n.CoreMessages;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.exception.LoggingExceptionHandler;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.AbstractMessageProcessorOwner;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.AsyncProcessingStrategyFactory;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.component.ComponentUtils;
import org.mule.runtime.core.internal.construct.FromFlowRejectedExecutionException;
import org.mule.runtime.core.internal.event.DefaultEventContext;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.processor.strategy.DirectProcessingStrategyFactory;
import org.mule.runtime.core.internal.util.FunctionalUtils;
import org.mule.runtime.core.internal.util.rx.Operators;
import org.mule.runtime.core.privileged.event.DefaultMuleSession;
import org.mule.runtime.core.privileged.event.PrivilegedEvent;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.core.privileged.processor.Scope;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChainBuilder;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/AsyncDelegateMessageProcessor.class */
public class AsyncDelegateMessageProcessor extends AbstractMessageProcessorOwner implements Scope, Initialisable, Startable, Stoppable {

    @Inject
    private MuleContext muleContext;

    @Inject
    private SchedulerService schedulerService;

    @Inject
    private ConfigurationComponentLocator componentLocator;
    protected Logger logger = LoggerFactory.getLogger(getClass());
    private ProcessingStrategy processingStrategy;
    private Sink sink;
    private QueueBackpressureHandler backpressureHandler;
    private final MessageProcessorChainBuilder delegateBuilder;
    protected MessageProcessorChain delegate;
    private Scheduler reactorScheduler;
    protected String name;
    private Integer maxConcurrency;

    /* loaded from: input_file:org/mule/runtime/core/internal/processor/AsyncDelegateMessageProcessor$QueueBackpressureHandler.class */
    private static class QueueBackpressureHandler implements Stoppable {
        private final Consumer<CoreEvent> eventDispatcher;
        private final LazyValue<Scheduler> queueDispatcherScheduler;
        private final AtomicReference<Future> executing = new AtomicReference<>();
        private final BlockingQueue<CoreEvent> asyncQueue = new LinkedBlockingQueue();

        public QueueBackpressureHandler(SchedulerService schedulerService, Supplier<SchedulerConfig> supplier, Consumer<CoreEvent> consumer, String str) {
            this.eventDispatcher = consumer;
            this.queueDispatcherScheduler = new LazyValue<>(() -> {
                return schedulerService.customScheduler(((SchedulerConfig) supplier.get()).withName(str + " - queue dispatcher").withMaxConcurrentTasks(1));
            });
        }

        private Future dispatchTask() {
            return ((Scheduler) this.queueDispatcherScheduler.get()).submit(() -> {
                while (!Thread.currentThread().isInterrupted()) {
                    try {
                        CoreEvent peek = this.asyncQueue.peek();
                        if (peek != null) {
                            this.eventDispatcher.accept(peek);
                            this.asyncQueue.remove(peek);
                        } else {
                            synchronized (this.executing) {
                                if (this.asyncQueue.size() == 0) {
                                    this.executing.set(null);
                                    return;
                                }
                            }
                        }
                    } catch (FromFlowRejectedExecutionException e) {
                        Thread.yield();
                    }
                }
            });
        }

        public void handleBackpressure(CoreEvent coreEvent) {
            this.asyncQueue.offer(coreEvent);
            synchronized (this.executing) {
                if (this.executing.get() == null) {
                    this.executing.set(dispatchTask());
                }
            }
        }

        public void stop() {
            this.queueDispatcherScheduler.ifComputed((v0) -> {
                v0.stop();
            });
            this.asyncQueue.clear();
        }
    }

    public AsyncDelegateMessageProcessor(MessageProcessorChainBuilder messageProcessorChainBuilder) {
        this.delegateBuilder = messageProcessorChainBuilder;
    }

    public AsyncDelegateMessageProcessor(MessageProcessorChainBuilder messageProcessorChainBuilder, String str) {
        this.delegateBuilder = messageProcessorChainBuilder;
        this.name = str;
    }

    public void initialise() throws InitialisationException {
        Pipeline pipeline = (Component) ComponentUtils.getFromAnnotatedObject(this.componentLocator, this).orElse(null);
        if (!(pipeline instanceof Pipeline)) {
            this.processingStrategy = createDefaultProcessingStrategyFactory().create(getMuleContext(), getLocation().getLocation());
        } else if (this.maxConcurrency != null) {
            AsyncProcessingStrategyFactory processingStrategyFactory = pipeline.getProcessingStrategyFactory();
            if (processingStrategyFactory instanceof AsyncProcessingStrategyFactory) {
                processingStrategyFactory.setMaxConcurrency(this.maxConcurrency.intValue());
            } else {
                this.logger.warn("{} does not support 'maxConcurrency'. Ignoring the value.", processingStrategyFactory.getClass().getSimpleName());
            }
            this.processingStrategy = processingStrategyFactory.create(getMuleContext(), getLocation().getLocation());
        } else {
            this.processingStrategy = pipeline.getProcessingStrategyFactory().create(getMuleContext(), getLocation().getLocation());
        }
        if (this.delegateBuilder == null) {
            throw new InitialisationException(CoreMessages.objectIsNull("delegate message processor"), this);
        }
        this.delegateBuilder.setProcessingStrategy(this.processingStrategy);
        this.delegate = this.delegateBuilder.build();
        LifecycleUtils.initialiseIfNeeded(this.delegate, getMuleContext());
        this.backpressureHandler = new QueueBackpressureHandler(this.schedulerService, () -> {
            return this.muleContext.getSchedulerBaseConfig();
        }, this::dispatchEvent, this.name != null ? this.name : getLocation().getLocation());
        LifecycleUtils.initialiseIfNeeded(this.processingStrategy, this.muleContext);
        super.initialise();
    }

    protected ProcessingStrategyFactory createDefaultProcessingStrategyFactory() {
        return new DirectProcessingStrategyFactory();
    }

    public void start() throws MuleException {
        LifecycleUtils.startIfNeeded(this.processingStrategy);
        LifecycleUtils.startIfNeeded(this.delegate);
        this.sink = this.processingStrategy.createSink((FlowConstruct) ComponentUtils.getFromAnnotatedObject(this.componentLocator, this).filter(flowConstruct -> {
            return flowConstruct instanceof FlowConstruct;
        }).orElse(null), processAsyncChainFunction());
        SchedulerConfig withName = getMuleContext().getSchedulerBaseConfig().withName(this.name != null ? this.name : getLocation().getLocation());
        this.reactorScheduler = this.processingStrategy.isSynchronous() ? this.schedulerService.ioScheduler(withName) : this.schedulerService.cpuLightScheduler(withName);
        LifecycleUtils.startIfNeeded(this.backpressureHandler);
        super.start();
    }

    public void stop() throws MuleException {
        super.stop();
        FunctionalUtils.safely(() -> {
            LifecycleUtils.stopIfNeeded(this.backpressureHandler);
        });
        LifecycleUtils.disposeIfNeeded(this.sink, this.logger);
        this.sink = null;
        LifecycleUtils.stopIfNeeded(this.delegate);
        if (this.reactorScheduler != null) {
            this.reactorScheduler.stop();
            this.reactorScheduler = null;
        }
        LifecycleUtils.stopIfNeeded(this.processingStrategy);
    }

    public void dispose() {
        super.dispose();
        LifecycleUtils.disposeIfNeeded(this.delegate, this.logger);
        LifecycleUtils.disposeIfNeeded(this.processingStrategy, this.logger);
    }

    public CoreEvent process(CoreEvent coreEvent) throws MuleException {
        return MessageProcessors.processToApply(coreEvent, this);
    }

    public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
        return Flux.from(publisher).cast(PrivilegedEvent.class).doOnNext(privilegedEvent -> {
            Flux map = Flux.just(privilegedEvent).map(privilegedEvent -> {
                return asyncEvent(privilegedEvent);
            });
            if (TransactionCoordination.isTransactionActive() && !this.processingStrategy.isSynchronous()) {
                map = map.publishOn(Schedulers.fromExecutorService(this.reactorScheduler));
            }
            map.map(coreEvent -> {
                try {
                    dispatchEvent(coreEvent);
                } catch (FromFlowRejectedExecutionException e) {
                    this.backpressureHandler.handleBackpressure(coreEvent);
                }
                return coreEvent;
            }).subscribe(Operators.requestUnbounded());
        }).cast(CoreEvent.class);
    }

    private void dispatchEvent(CoreEvent coreEvent) {
        this.processingStrategy.checkBackpressureAccepting(coreEvent);
        this.sink.accept(coreEvent);
    }

    private CoreEvent asyncEvent(PrivilegedEvent privilegedEvent) {
        return PrivilegedEvent.builder(DefaultEventContext.child(privilegedEvent.getContext(), Optional.ofNullable(getLocation()), LoggingExceptionHandler.getInstance()), privilegedEvent).session(new DefaultMuleSession(privilegedEvent.getSession())).build();
    }

    private ReactiveProcessor processAsyncChainFunction() {
        return publisher -> {
            return Flux.from(publisher).doOnNext(fireAsyncScheduledNotification()).transform(this.processingStrategy.onPipeline(scheduleAsync(this.delegate))).doOnNext(coreEvent -> {
                fireAsyncCompleteNotification(coreEvent, null);
                coreEvent.getContext().success(coreEvent);
            }).doOnError(MessagingException.class, messagingException -> {
                fireAsyncCompleteNotification(messagingException.getEvent(), messagingException);
                messagingException.getEvent().getContext().error(messagingException);
            }).doOnError(th -> {
                this.logger.warn("Error occurred during asynchronous processing at:" + getLocation().getLocation() + " . To handle this error include a <try> scope in the <async> scope.", th);
            });
        };
    }

    private ReactiveProcessor scheduleAsync(Processor processor) {
        return this.processingStrategy.isSynchronous() ? publisher -> {
            return Flux.from(publisher).transform(processor).subscribeOn(Schedulers.fromExecutorService(this.reactorScheduler));
        } : processor;
    }

    private Consumer<CoreEvent> fireAsyncScheduledNotification() {
        return coreEvent -> {
            this.muleContext.getNotificationManager().fireNotification(new AsyncMessageNotification(EnrichedNotificationInfo.createInfo(coreEvent, (Exception) null, this), getLocation(), 1901));
        };
    }

    private void fireAsyncCompleteNotification(CoreEvent coreEvent, MessagingException messagingException) {
        this.muleContext.getNotificationManager().fireNotification(new AsyncMessageNotification(EnrichedNotificationInfo.createInfo(coreEvent, messagingException, this), getLocation(), 1902));
    }

    protected List<Processor> getOwnedMessageProcessors() {
        return Collections.singletonList(this.delegate);
    }

    public void setMaxConcurrency(Integer num) {
        this.maxConcurrency = num;
    }

    protected List<Processor> getOwnedObjects() {
        return Collections.emptyList();
    }

    void setSchedulerService(SchedulerService schedulerService) {
        this.schedulerService = schedulerService;
    }
}
