package org.mule.runtime.core.internal.processor;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;
import javax.inject.Inject;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.DefaultEventContext;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.config.i18n.CoreMessages;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.context.notification.AsyncMessageNotification;
import org.mule.runtime.core.api.context.notification.EnrichedNotificationInfo;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.processor.AbstractMessageProcessorOwner;
import org.mule.runtime.core.api.processor.MessageProcessorChain;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Scope;
import org.mule.runtime.core.api.scheduler.SchedulerService;
import org.mule.runtime.core.api.session.DefaultMuleSession;
import org.mule.runtime.core.internal.util.ProcessingStrategyUtils;
import org.mule.runtime.core.internal.util.rx.Operators;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/AsyncDelegateMessageProcessor.class */
public class AsyncDelegateMessageProcessor extends AbstractMessageProcessorOwner implements Scope, Initialisable, Startable, Stoppable {

    @Inject
    private SchedulerService schedulerService;

    @Inject
    private ConfigurationComponentLocator componentLocator;
    protected Logger logger = LoggerFactory.getLogger(getClass());
    private FlowConstruct flowConstruct;
    protected MessageProcessorChain delegate;
    private Scheduler scheduler;
    private reactor.core.scheduler.Scheduler reactorScheduler;
    protected String name;

    public AsyncDelegateMessageProcessor(MessageProcessorChain messageProcessorChain) {
        this.delegate = messageProcessorChain;
    }

    public AsyncDelegateMessageProcessor(MessageProcessorChain messageProcessorChain, String str) {
        this.delegate = messageProcessorChain;
        this.name = str;
    }

    @Override // org.mule.runtime.core.api.processor.AbstractMuleObjectOwner, org.mule.runtime.api.lifecycle.Initialisable
    public void initialise() throws InitialisationException {
        this.flowConstruct = FlowConstruct.getFromAnnotatedObject(this.componentLocator, this);
        if (this.delegate == null) {
            throw new InitialisationException(CoreMessages.objectIsNull("delegate message processor"), this);
        }
        super.initialise();
    }

    @Override // org.mule.runtime.core.api.processor.AbstractMuleObjectOwner, org.mule.runtime.api.lifecycle.Startable
    public void start() throws MuleException {
        this.scheduler = this.schedulerService.ioScheduler(this.muleContext.getSchedulerBaseConfig().withName(this.name != null ? this.name : getLocation().getLocation()));
        this.reactorScheduler = Schedulers.fromExecutorService(this.scheduler);
        super.start();
    }

    @Override // org.mule.runtime.core.api.processor.AbstractMuleObjectOwner, org.mule.runtime.api.lifecycle.Stoppable
    public void stop() throws MuleException {
        super.stop();
        if (this.scheduler != null) {
            this.scheduler.stop();
            this.scheduler = null;
        }
        if (this.reactorScheduler != null) {
            this.reactorScheduler.dispose();
            this.reactorScheduler = null;
        }
    }

    @Override // org.mule.runtime.core.api.processor.Processor
    public InternalEvent process(InternalEvent internalEvent) throws MuleException {
        return MessageProcessors.processToApply(internalEvent, this);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.mule.runtime.core.api.processor.Processor, java.util.function.Function
    public Publisher<InternalEvent> apply(Publisher<InternalEvent> publisher) {
        return Flux.from(publisher).doOnNext(internalEvent -> {
            Flux.just(internalEvent).map(internalEvent -> {
                return asyncEvent(internalEvent);
            }).transform(flux -> {
                return Flux.from(flux).doOnNext(fireAsyncScheduledNotification()).doOnNext(internalEvent2 -> {
                    Flux.just(internalEvent2).transform(scheduleAsync(this.delegate)).doOnNext(internalEvent2 -> {
                        fireAsyncCompleteNotification(internalEvent2, null);
                    }).doOnError(MessagingException.class, messagingException -> {
                        fireAsyncCompleteNotification(messagingException.getEvent(), messagingException);
                    }).subscribe(internalEvent3 -> {
                        internalEvent2.getContext().success(internalEvent3);
                    }, th -> {
                        internalEvent2.getContext().error(th);
                    });
                });
            }).subscribe(Operators.requestUnbounded());
        });
    }

    private ReactiveProcessor scheduleAsync(Processor processor) {
        return (ProcessingStrategyUtils.isSynchronousProcessing(this.flowConstruct) || !(this.flowConstruct instanceof Pipeline)) ? publisher -> {
            return Flux.from(publisher).transform(processor).subscribeOn(this.reactorScheduler);
        } : publisher2 -> {
            return Flux.from(publisher2).transform(((Pipeline) this.flowConstruct).getProcessingStrategy().onPipeline(processor));
        };
    }

    private InternalEvent asyncEvent(InternalEvent internalEvent) {
        return InternalEvent.builder(DefaultEventContext.fireAndForgetChild(internalEvent.getContext(), Optional.ofNullable(getLocation())), internalEvent).replyToHandler(null).session(new DefaultMuleSession(internalEvent.getSession())).build();
    }

    private Consumer<InternalEvent> fireAsyncScheduledNotification() {
        return internalEvent -> {
            this.muleContext.getNotificationManager().fireNotification(new AsyncMessageNotification(EnrichedNotificationInfo.createInfo(internalEvent, null, this), getLocation(), AsyncMessageNotification.PROCESS_ASYNC_SCHEDULED));
        };
    }

    private void fireAsyncCompleteNotification(InternalEvent internalEvent, MessagingException messagingException) {
        this.muleContext.getNotificationManager().fireNotification(new AsyncMessageNotification(EnrichedNotificationInfo.createInfo(internalEvent, messagingException, this), getLocation(), AsyncMessageNotification.PROCESS_ASYNC_COMPLETE));
    }

    @Override // org.mule.runtime.core.api.processor.AbstractMessageProcessorOwner
    protected List<Processor> getOwnedMessageProcessors() {
        return Collections.singletonList(this.delegate);
    }
}
