package org.mule.runtime.core.internal.execution;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.inject.Inject;
import javax.xml.namespace.QName;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.component.location.Location;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.ErrorTypeRepository;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.api.notification.ConnectorMessageNotification;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.event.EventContextFactory;
import org.mule.runtime.core.api.exception.Errors;
import org.mule.runtime.core.api.exception.NullExceptionHandler;
import org.mule.runtime.core.api.functional.Either;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.interception.InterceptorManager;
import org.mule.runtime.core.internal.message.ErrorBuilder;
import org.mule.runtime.core.internal.message.InternalEvent;
import org.mule.runtime.core.internal.policy.PolicyManager;
import org.mule.runtime.core.internal.policy.SourcePolicy;
import org.mule.runtime.core.internal.policy.SourcePolicyFailureResult;
import org.mule.runtime.core.internal.policy.SourcePolicySuccessResult;
import org.mule.runtime.core.internal.processor.interceptor.ReactiveInterceptorSourceCallbackAdapter;
import org.mule.runtime.core.internal.util.FunctionalUtils;
import org.mule.runtime.core.internal.util.InternalExceptionUtils;
import org.mule.runtime.core.internal.util.MessagingExceptionResolver;
import org.mule.runtime.core.internal.util.message.MessageUtils;
import org.mule.runtime.core.privileged.PrivilegedMuleContext;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.execution.MessageProcessContext;
import org.mule.runtime.core.privileged.execution.MessageProcessTemplate;
import org.mule.runtime.core.privileged.processor.MessageProcessors;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/mule/runtime/core/internal/execution/ModuleFlowProcessingPhase.class */
public class ModuleFlowProcessingPhase extends NotificationFiringProcessingPhase<ModuleFlowProcessingPhaseTemplate> implements Initialisable {
    private ErrorType sourceResponseGenerateErrorType;
    private ErrorType sourceResponseSendErrorType;
    private ErrorType sourceErrorResponseGenerateErrorType;
    private ErrorType sourceErrorResponseSendErrorType;
    private ConfigurationComponentLocator componentLocator;
    private final PolicyManager policyManager;
    private final List<ReactiveInterceptorSourceCallbackAdapter> additionalInterceptors = new LinkedList();

    @Inject
    private InterceptorManager processorInterceptorManager;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/execution/ModuleFlowProcessingPhase$FlowProcessor.class */
    public class FlowProcessor implements Processor, Component {
        private final ModuleFlowProcessingPhaseTemplate template;
        private final FlowConstruct flowConstruct;

        public FlowProcessor(ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate, FlowConstruct flowConstruct) {
            this.template = moduleFlowProcessingPhaseTemplate;
            this.flowConstruct = flowConstruct;
        }

        @Override // org.mule.runtime.core.api.processor.Processor
        public CoreEvent process(CoreEvent coreEvent) throws MuleException {
            return MessageProcessors.processToApply(coreEvent, this);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.mule.runtime.core.api.processor.Processor, java.util.function.Function
        public Publisher<CoreEvent> apply(Publisher<CoreEvent> publisher) {
            return Mono.from(publisher).flatMap(coreEvent -> {
                return Mono.from(MessageProcessors.processWithChildContext(coreEvent, publisher2 -> {
                    return this.template.routeEventAsync((Publisher<CoreEvent>) publisher2);
                }, Optional.empty(), this.flowConstruct.getExceptionListener()));
            });
        }

        @Override // org.mule.runtime.api.component.Component
        public Object getAnnotation(QName qName) {
            return this.flowConstruct.getAnnotation(qName);
        }

        @Override // org.mule.runtime.api.component.Component
        public Map<QName, Object> getAnnotations() {
            return this.flowConstruct.getAnnotations();
        }

        @Override // org.mule.runtime.api.component.Component
        public void setAnnotations(Map<QName, Object> map) {
            throw new UnsupportedOperationException();
        }

        @Override // org.mule.runtime.api.component.Component
        public ComponentLocation getLocation() {
            return this.flowConstruct.getLocation();
        }

        @Override // org.mule.runtime.api.component.Component
        public Location getRootContainerLocation() {
            return this.flowConstruct.getRootContainerLocation();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/execution/ModuleFlowProcessingPhase$PhaseContext.class */
    public static final class PhaseContext {
        final ModuleFlowProcessingPhaseTemplate template;
        final MessageProcessContext messageProcessContext;
        final PhaseResultNotifier phaseResultNotifier;
        final Consumer<Either<MessagingException, CoreEvent>> terminateConsumer;

        PhaseContext(ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier, Consumer<Either<MessagingException, CoreEvent>> consumer) {
            this.template = moduleFlowProcessingPhaseTemplate;
            this.messageProcessContext = messageProcessContext;
            this.phaseResultNotifier = phaseResultNotifier;
            this.terminateConsumer = consumer;
        }
    }

    public ModuleFlowProcessingPhase(PolicyManager policyManager) {
        this.policyManager = policyManager;
    }

    @Override // org.mule.runtime.api.lifecycle.Initialisable
    public void initialise() throws InitialisationException {
        ErrorTypeRepository errorTypeRepository = this.muleContext.getErrorTypeRepository();
        this.componentLocator = this.muleContext.getConfigurationComponentLocator();
        this.sourceResponseGenerateErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_RESPONSE_GENERATE).get();
        this.sourceResponseSendErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_RESPONSE_SEND).get();
        this.sourceErrorResponseGenerateErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_ERROR_RESPONSE_GENERATE).get();
        this.sourceErrorResponseSendErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.Handleable.SOURCE_ERROR_RESPONSE_SEND).get();
        if (this.processorInterceptorManager != null) {
            this.processorInterceptorManager.getSourceInterceptorFactories().stream().forEach(sourceInterceptorFactory -> {
                ReactiveInterceptorSourceCallbackAdapter reactiveInterceptorSourceCallbackAdapter = new ReactiveInterceptorSourceCallbackAdapter(sourceInterceptorFactory);
                try {
                    this.muleContext.getInjector().inject(reactiveInterceptorSourceCallbackAdapter);
                    this.additionalInterceptors.add(0, reactiveInterceptorSourceCallbackAdapter);
                } catch (MuleException e) {
                    throw new MuleRuntimeException(e);
                }
            });
        }
    }

    @Override // org.mule.runtime.core.internal.execution.MessageProcessPhase
    public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate) {
        return messageProcessTemplate instanceof ModuleFlowProcessingPhaseTemplate;
    }

    @Override // org.mule.runtime.core.internal.execution.MessageProcessPhase
    public void runPhase(ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier) {
        try {
            MessageSource messageSource = messageProcessContext.getMessageSource();
            FlowConstruct flowConstruct = (FlowConstruct) this.componentLocator.find(messageSource.getRootContainerLocation()).get();
            ComponentLocation location = messageSource.getLocation();
            Consumer<Either<MessagingException, CoreEvent>> terminateConsumer = getTerminateConsumer(messageSource, moduleFlowProcessingPhaseTemplate);
            CompletableFuture completableFuture = new CompletableFuture();
            CoreEvent createEvent = createEvent(moduleFlowProcessingPhaseTemplate, location, completableFuture, flowConstruct);
            try {
                SourcePolicy createSourcePolicyInstance = this.policyManager.createSourcePolicyInstance(messageSource, createEvent, new FlowProcessor(moduleFlowProcessingPhaseTemplate, flowConstruct), moduleFlowProcessingPhaseTemplate);
                PhaseContext phaseContext = new PhaseContext(moduleFlowProcessingPhaseTemplate, messageProcessContext, phaseResultNotifier, terminateConsumer);
                Mono.just(createEvent).doOnNext(onMessageReceived(moduleFlowProcessingPhaseTemplate, messageProcessContext, flowConstruct)).flatMap(coreEvent -> {
                    return Mono.from(createSourcePolicyInstance.process(coreEvent, moduleFlowProcessingPhaseTemplate)).flatMap(either -> {
                        return (Mono) either.reduce(policyFailure(phaseContext, flowConstruct, messageSource), policySuccess(phaseContext, flowConstruct, messageSource));
                    });
                }).doOnSuccess(r3 -> {
                    phaseResultNotifier.phaseSuccessfully();
                }).doOnError(onFailure(flowConstruct, messageSource, phaseResultNotifier, terminateConsumer)).doAfterTerminate(() -> {
                    completableFuture.complete(null);
                }).subscribe();
            } catch (Exception e) {
                Mono.from(moduleFlowProcessingPhaseTemplate.sendFailureResponseToClient(new MessagingExceptionResolver(messageProcessContext.getMessageSource()).resolve(new MessagingException(createEvent, e), this.muleContext), moduleFlowProcessingPhaseTemplate.getFailedExecutionResponseParametersFunction().apply(createEvent))).doOnTerminate(() -> {
                    phaseResultNotifier.phaseFailure(e);
                }).subscribe();
            }
        } catch (Exception e2) {
            phaseResultNotifier.phaseFailure(e2);
        }
    }

    private Consumer<CoreEvent> onMessageReceived(ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate, MessageProcessContext messageProcessContext, FlowConstruct flowConstruct) {
        return coreEvent -> {
            fireNotification(messageProcessContext.getMessageSource(), coreEvent, flowConstruct, ConnectorMessageNotification.MESSAGE_RECEIVED);
            moduleFlowProcessingPhaseTemplate.getNotificationFunctions().forEach(notificationFunction -> {
                this.muleContext.getNotificationManager().fireNotification(notificationFunction.apply(coreEvent, messageProcessContext.getMessageSource()));
            });
        };
    }

    private Function<SourcePolicySuccessResult, Mono<Void>> policySuccess(PhaseContext phaseContext, FlowConstruct flowConstruct, MessageSource messageSource) {
        return sourcePolicySuccessResult -> {
            fireNotification(phaseContext.messageProcessContext.getMessageSource(), sourcePolicySuccessResult.getResult(), flowConstruct, ConnectorMessageNotification.MESSAGE_RESPONSE);
            try {
                Function<SourcePolicySuccessResult, Publisher<Void>> function = sourcePolicySuccessResult -> {
                    return phaseContext.template.sendResponseToClient(sourcePolicySuccessResult.getResult(), sourcePolicySuccessResult.getResponseParameters().get());
                };
                Iterator<ReactiveInterceptorSourceCallbackAdapter> it = this.additionalInterceptors.iterator();
                while (it.hasNext()) {
                    function = it.next().apply(messageSource, function);
                }
                return Mono.from(function.apply(sourcePolicySuccessResult)).doOnSuccess(r11 -> {
                    onTerminate(flowConstruct, messageSource, phaseContext.terminateConsumer, Either.right(sourcePolicySuccessResult.getResult()));
                }).onErrorResume(th -> {
                    return policySuccessError(new SourceErrorException(sourcePolicySuccessResult.getResult(), this.sourceResponseSendErrorType, th), sourcePolicySuccessResult, phaseContext, flowConstruct, messageSource);
                });
            } catch (Exception e) {
                return policySuccessError(new SourceErrorException(sourcePolicySuccessResult.getResult(), this.sourceResponseGenerateErrorType, e), sourcePolicySuccessResult, phaseContext, flowConstruct, messageSource);
            }
        };
    }

    private Function<SourcePolicyFailureResult, Mono<Void>> policyFailure(PhaseContext phaseContext, FlowConstruct flowConstruct, MessageSource messageSource) {
        return sourcePolicyFailureResult -> {
            fireNotification(phaseContext.messageProcessContext.getMessageSource(), sourcePolicyFailureResult.getMessagingException().getEvent(), flowConstruct, ConnectorMessageNotification.MESSAGE_ERROR_RESPONSE);
            return sendErrorResponse(sourcePolicyFailureResult.getMessagingException(), coreEvent -> {
                return sourcePolicyFailureResult.getErrorResponseParameters().get();
            }, phaseContext, flowConstruct).doOnSuccess(r11 -> {
                onTerminate(flowConstruct, messageSource, phaseContext.terminateConsumer, Either.left(sourcePolicyFailureResult.getMessagingException()));
            });
        };
    }

    private Mono<Void> policySuccessError(SourceErrorException sourceErrorException, SourcePolicySuccessResult sourcePolicySuccessResult, PhaseContext phaseContext, FlowConstruct flowConstruct, MessageSource messageSource) {
        MessagingException messagingException = sourceErrorException.toMessagingException(flowConstruct.getMuleContext().getExceptionContextProviders(), messageSource);
        return Mono.when((Publisher<?>[]) new Publisher[]{Mono.just(messagingException).flatMapMany(flowConstruct.getExceptionListener()).last().onErrorResume(th -> {
            return Mono.empty();
        }), sendErrorResponse(messagingException, sourcePolicySuccessResult.createErrorResponseParameters(), phaseContext, flowConstruct).doOnSuccess(r11 -> {
            onTerminate(flowConstruct, messageSource, phaseContext.terminateConsumer, Either.left(messagingException));
        })}).then();
    }

    private Mono<Void> sendErrorResponse(MessagingException messagingException, Function<CoreEvent, Map<String, Object>> function, PhaseContext phaseContext, FlowConstruct flowConstruct) {
        CoreEvent event = messagingException.getEvent();
        try {
            return Mono.from(phaseContext.template.sendFailureResponseToClient(messagingException, function.apply(event))).onErrorMap(th -> {
                return new SourceErrorException(CoreEvent.builder(event).error(ErrorBuilder.builder(th).errorType(this.sourceErrorResponseSendErrorType).build()).build(), this.sourceErrorResponseSendErrorType, th);
            });
        } catch (Exception e) {
            return Mono.error(new SourceErrorException(event, this.sourceErrorResponseGenerateErrorType, e, messagingException));
        }
    }

    private Consumer<Throwable> onFailure(FlowConstruct flowConstruct, MessageSource messageSource, PhaseResultNotifier phaseResultNotifier, Consumer<Either<MessagingException, CoreEvent>> consumer) {
        return th -> {
            onTerminate(flowConstruct, messageSource, consumer, Either.left(th));
            Throwable cause = th instanceof SourceErrorException ? th.getCause() : th;
            phaseResultNotifier.phaseFailure(cause instanceof Exception ? (Exception) cause : new DefaultMuleException(cause));
        };
    }

    private Consumer<Either<MessagingException, CoreEvent>> getTerminateConsumer(MessageSource messageSource, ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate) {
        return either -> {
            moduleFlowProcessingPhaseTemplate.afterPhaseExecution(either.mapLeft(messagingException -> {
                messagingException.setProcessedEvent(InternalExceptionUtils.createErrorEvent(messagingException.getEvent(), messageSource, messagingException, ((PrivilegedMuleContext) this.muleContext).getErrorTypeLocator()));
                return messagingException;
            }));
        };
    }

    private CoreEvent createEvent(ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate, ComponentLocation componentLocation, CompletableFuture completableFuture, FlowConstruct flowConstruct) {
        Message message = moduleFlowProcessingPhaseTemplate.getMessage();
        if (!(message.getPayload().getValue() instanceof SourceResultAdapter)) {
            return createEventBuilder(componentLocation, completableFuture, flowConstruct, null, message).build();
        }
        SourceResultAdapter sourceResultAdapter = (SourceResultAdapter) message.getPayload().getValue();
        return createEventBuilder(componentLocation, completableFuture, flowConstruct, sourceResultAdapter.getCorrelationId().orElse(null), message).message(eventContext -> {
            Result result = sourceResultAdapter.getResult();
            Object output = result.getOutput();
            return ((output instanceof Collection) && sourceResultAdapter.isCollection()) ? MessageUtils.toMessage(Result.builder().output(MessageUtils.toMessageCollection((Collection) output, sourceResultAdapter.getCursorProviderFactory(), ((BaseEventContext) eventContext).getRootContext())).mediaType(result.getMediaType().orElse(MediaType.ANY)).build()) : MessageUtils.toMessage((Result<?, ?>) result, sourceResultAdapter.getMediaType(), sourceResultAdapter.getCursorProviderFactory(), ((BaseEventContext) eventContext).getRootContext());
        }).build();
    }

    private InternalEvent.Builder createEventBuilder(ComponentLocation componentLocation, CompletableFuture completableFuture, FlowConstruct flowConstruct, String str, Message message) {
        return InternalEvent.builder(EventContextFactory.create(flowConstruct, NullExceptionHandler.getInstance(), componentLocation, str, (Optional<CompletableFuture<Void>>) Optional.of(completableFuture))).message(message);
    }

    private CoreEvent emptyEvent(CoreEvent coreEvent) {
        return CoreEvent.builder(coreEvent).message(Message.of(null)).build();
    }

    private void onTerminate(FlowConstruct flowConstruct, MessageSource messageSource, Consumer<Either<MessagingException, CoreEvent>> consumer, Either<Throwable, CoreEvent> either) {
        FunctionalUtils.safely(() -> {
            consumer.accept(either.mapLeft(th -> {
                if (th instanceof MessagingException) {
                    return (MessagingException) th;
                }
                if (th instanceof SourceErrorException) {
                    return ((SourceErrorException) th).toMessagingException(flowConstruct.getMuleContext().getExceptionContextProviders(), messageSource);
                }
                return null;
            }));
        });
    }

    @Override // java.lang.Comparable
    public int compareTo(MessageProcessPhase messageProcessPhase) {
        return messageProcessPhase instanceof ValidationPhase ? 1 : 0;
    }
}
