package org.mule.runtime.core.execution;

import java.util.Collection;
import java.util.Map;
import java.util.function.Consumer;
import java.util.function.Function;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.ErrorType;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.message.NullAttributes;
import org.mule.runtime.api.metadata.MediaType;
import org.mule.runtime.core.DefaultEventContext;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.functional.Either;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.api.util.func.CheckedFunction;
import org.mule.runtime.core.context.notification.ConnectorMessageNotification;
import org.mule.runtime.core.exception.ErrorTypeMatcher;
import org.mule.runtime.core.exception.ErrorTypeRepository;
import org.mule.runtime.core.exception.Errors;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.exception.SingleErrorTypeMatcher;
import org.mule.runtime.core.internal.util.FunctionalUtils;
import org.mule.runtime.core.message.ErrorBuilder;
import org.mule.runtime.core.policy.PolicyManager;
import org.mule.runtime.core.policy.SourcePolicy;
import org.mule.runtime.core.transaction.MuleTransactionConfig;
import org.mule.runtime.core.util.message.MessageUtils;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;

/* loaded from: input_file:org/mule/runtime/core/execution/ModuleFlowProcessingPhase.class */
public class ModuleFlowProcessingPhase extends NotificationFiringProcessingPhase<ModuleFlowProcessingPhaseTemplate> implements Initialisable {
    public static final String ENABLE_SOURCE_POLICIES_SYSTEM_PROPERTY = "enableSourcePolicies";
    private static Logger LOGGER = LoggerFactory.getLogger((Class<?>) ModuleFlowProcessingPhase.class);
    private boolean enableSourcePolicies;
    private ErrorTypeMatcher sourceResponseErrorTypeMatcher;
    private ErrorType sourceResponseGenerateErrorType;
    private ErrorType sourceResponseSendErrorType;
    private ErrorType sourceErrorResponseGenerateErrorType;
    private ErrorType sourceErrorResponseSendErrorType;
    private final PolicyManager policyManager;

    public ModuleFlowProcessingPhase(PolicyManager policyManager) {
        this.policyManager = policyManager;
    }

    @Override // org.mule.runtime.api.lifecycle.Initialisable
    public void initialise() throws InitialisationException {
        this.enableSourcePolicies = System.getProperty(ENABLE_SOURCE_POLICIES_SYSTEM_PROPERTY) != null;
        ErrorTypeRepository errorTypeRepository = this.muleContext.getErrorTypeRepository();
        this.sourceResponseErrorTypeMatcher = new SingleErrorTypeMatcher(errorTypeRepository.getSourceResponseErrorType());
        this.sourceResponseGenerateErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.SOURCE_RESPONSE_GENERATE).get();
        this.sourceResponseSendErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.SOURCE_RESPONSE_SEND).get();
        this.sourceErrorResponseGenerateErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.SOURCE_ERROR_RESPONSE_GENERATE).get();
        this.sourceErrorResponseSendErrorType = errorTypeRepository.getErrorType(Errors.ComponentIdentifiers.SOURCE_ERROR_RESPONSE_SEND).get();
    }

    @Override // org.mule.runtime.core.execution.MessageProcessPhase
    public boolean supportsTemplate(MessageProcessTemplate messageProcessTemplate) {
        return messageProcessTemplate instanceof ModuleFlowProcessingPhaseTemplate;
    }

    @Override // org.mule.runtime.core.execution.MessageProcessPhase
    public void runPhase(ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier) {
        try {
            MessagingExceptionHandler exceptionListener = messageProcessContext.getFlowConstruct().getExceptionListener();
            MessageSource messageSource = messageProcessContext.getMessageSource();
            ComponentLocation location = messageSource.getLocation();
            CheckedFunction<MessagingException, Publisher<Void>> onError = onError(messageSource, moduleFlowProcessingPhaseTemplate.getFailedExecutionResponseParametersFunction(), messageProcessContext, moduleFlowProcessingPhaseTemplate, phaseResultNotifier);
            Consumer<Either<MessagingException, Event>> terminateConsumer = getTerminateConsumer(messageSource, moduleFlowProcessingPhaseTemplate);
            MonoProcessor create = MonoProcessor.create();
            Event createEvent = createEvent(moduleFlowProcessingPhaseTemplate, messageProcessContext, location, create);
            if (this.enableSourcePolicies) {
                SourcePolicy createSourcePolicyInstance = this.policyManager.createSourcePolicyInstance(location, createEvent, createFlowExecutionProcessor(messageSource, exceptionListener, messageProcessContext, moduleFlowProcessingPhaseTemplate), moduleFlowProcessingPhaseTemplate);
                try {
                    createSourcePolicyInstance.process(createEvent).apply(failureSourcePolicyResult -> {
                        MessagingException messagingException = failureSourcePolicyResult.getMessagingException();
                        try {
                            ErrorType errorType = messagingException.getEvent().getError().orElseGet(() -> {
                                return ErrorBuilder.builder(messagingException.getCause()).errorType(this.sourceResponseGenerateErrorType).build();
                            }).getErrorType();
                            Mono.from((Publisher) onError.apply(messagingException)).doOnSuccess(r12 -> {
                                if (this.sourceResponseErrorTypeMatcher.match(errorType)) {
                                    onTerminate(terminateConsumer, Either.right(messagingException.getEvent()));
                                } else {
                                    onTerminate(terminateConsumer, Either.left(new SourceErrorException(messagingException.getEvent(), errorType, messagingException).toMessagingException()));
                                }
                            }).doOnError(SourceErrorException.class, sourceErrorException -> {
                                onTerminate(terminateConsumer, Either.left(sourceErrorException.toMessagingException()));
                            }).subscribe();
                        } catch (SourceErrorException e) {
                            onTerminate(terminateConsumer, Either.left(e.toMessagingException()));
                        }
                    }, successSourcePolicyResult -> {
                        Event flowExecutionResult = successSourcePolicyResult.getFlowExecutionResult();
                        fireNotification(messageSource, flowExecutionResult, messageProcessContext.getFlowConstruct(), ConnectorMessageNotification.MESSAGE_RESPONSE);
                        if (flowExecutionResult == null) {
                            flowExecutionResult = emptyEvent(createEvent);
                        }
                        Event event = flowExecutionResult;
                        Mono.from(moduleFlowProcessingPhaseTemplate.sendResponseToClient(flowExecutionResult, successSourcePolicyResult.getResponseParameters(), event2 -> {
                            return successSourcePolicyResult.createErrorResponseParameters(event2);
                        }, createResponseCompletationCallback(phaseResultNotifier))).onErrorResume(SourceErrorException.class, sourceErrorException -> {
                            return Mono.from((Publisher) onError.apply(sourceErrorException.toMessagingException())).doOnSuccess(r7 -> {
                                onTerminate(terminateConsumer, Either.left(sourceErrorException));
                            });
                        }).doAfterTerminate((r7, th) -> {
                            onTerminate(terminateConsumer, Either.right(event));
                        }).subscribe();
                    });
                    this.policyManager.disposePoliciesResources(createEvent.getContext().getId());
                } catch (Throwable th) {
                    this.policyManager.disposePoliciesResources(createEvent.getContext().getId());
                    throw th;
                }
            } else {
                Mono.just(createEvent).doOnNext(event -> {
                    fireNotification(messageProcessContext.getMessageSource(), event, messageProcessContext.getFlowConstruct(), ConnectorMessageNotification.MESSAGE_RECEIVED);
                }).then(event2 -> {
                    return Mono.from(moduleFlowProcessingPhaseTemplate.routeEventAsync(event2)).switchIfEmpty(Mono.fromCallable(() -> {
                        return emptyEvent(createEvent);
                    }));
                }).then(onSuccess(messageSource, createEvent, messageProcessContext, phaseResultNotifier, moduleFlowProcessingPhaseTemplate, terminateConsumer)).onErrorMap(MessagingException.class, messagingException -> {
                    return messagingException.getCause() instanceof SourceErrorException ? messagingException.getCause() : messagingException;
                }).onErrorResume(SourceErrorException.class, sourceErrorException -> {
                    return onSourceException(exceptionListener, onError, terminateConsumer, sourceErrorException);
                }).onErrorResume(MessagingException.class, messagingException2 -> {
                    return Mono.from((Publisher) onError.apply(messagingException2)).doOnSuccess(r7 -> {
                        onTerminate(terminateConsumer, Either.left(messagingException2));
                    }).doOnError(th2 -> {
                        onTerminate(terminateConsumer, Either.left(th2));
                    });
                }).doAfterTerminate((r3, th2) -> {
                    create.onComplete();
                }).subscribe();
            }
        } catch (Exception e) {
            phaseResultNotifier.phaseFailure(e);
        }
    }

    private Mono<Void> onSourceException(MessagingExceptionHandler messagingExceptionHandler, Function<MessagingException, Publisher<Void>> function, Consumer<Either<MessagingException, Event>> consumer, SourceErrorException sourceErrorException) {
        return this.sourceResponseErrorTypeMatcher.match(sourceErrorException.getErrorType()) ? Mono.from(handleSourceError(messagingExceptionHandler, function, sourceErrorException)).doOnSuccess(r7 -> {
            onTerminate(consumer, Either.right(sourceErrorException.getEvent()));
        }).doOnError(th -> {
            onTerminate(consumer, Either.left(th));
        }) : Mono.error(sourceErrorException);
    }

    private Publisher<Void> handleSourceError(MessagingExceptionHandler messagingExceptionHandler, Function<MessagingException, Publisher<Void>> function, SourceErrorException sourceErrorException) {
        MessagingException messagingException = sourceErrorException.toMessagingException();
        messagingExceptionHandler.handleException(messagingException, messagingException.getEvent());
        return function.apply(messagingException);
    }

    private Event createEvent(ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate, MessageProcessContext messageProcessContext, ComponentLocation componentLocation, Publisher<Void> publisher) throws MuleException {
        Message message = moduleFlowProcessingPhaseTemplate.getMessage();
        Event build = Event.builder(DefaultEventContext.create(messageProcessContext.getFlowConstruct(), componentLocation, null, publisher)).message(message).flow(messageProcessContext.getFlowConstruct()).build();
        if (message.getPayload().getValue() instanceof SourceResultAdapter) {
            SourceResultAdapter sourceResultAdapter = (SourceResultAdapter) message.getPayload().getValue();
            Result result = sourceResultAdapter.getResult();
            Object output = result.getOutput();
            build = Event.builder(build).message(((output instanceof Collection) && sourceResultAdapter.isCollection()) ? MessageUtils.toMessage(Result.builder().output(MessageUtils.toMessageCollection((Collection) output, sourceResultAdapter.getCursorProviderFactory(), build)).attributes(NullAttributes.NULL_ATTRIBUTES).mediaType(result.getMediaType().orElse(MediaType.ANY)).build()) : MessageUtils.toMessage(result, result.getMediaType().orElse(MediaType.ANY), sourceResultAdapter.getCursorProviderFactory(), build)).build();
        }
        return build;
    }

    private CheckedFunction<MessagingException, Publisher<Void>> onError(MessageSource messageSource, Function<Event, Map<String, Object>> function, MessageProcessContext messageProcessContext, ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate, PhaseResultNotifier phaseResultNotifier) {
        return messagingException -> {
            Event createErrorEvent = ExceptionUtils.createErrorEvent(messagingException.getEvent(), messageSource, messagingException, this.muleContext.getErrorTypeLocator());
            messagingException.setProcessedEvent(createErrorEvent);
            fireNotification(messageSource, messagingException.getEvent(), messageProcessContext.getFlowConstruct(), ConnectorMessageNotification.MESSAGE_ERROR_RESPONSE);
            if (messagingException.inErrorHandler()) {
                phaseResultNotifier.phaseFailure((Exception) messagingException.getCause());
                return Mono.error(new SourceErrorException(messagingException.getEvent(), this.sourceErrorResponseGenerateErrorType, messagingException.getCause(), messagingException));
            }
            try {
                return moduleFlowProcessingPhaseTemplate.sendFailureResponseToClient(messagingException, (Map) function.apply(messagingException.getEvent()), createSendFailureResponseCompletationCallback(phaseResultNotifier, this.sourceErrorResponseSendErrorType));
            } catch (Exception e) {
                phaseResultNotifier.phaseFailure(e);
                return Mono.error(new SourceErrorException(createErrorEvent, this.sourceErrorResponseGenerateErrorType, e, messagingException));
            }
        };
    }

    private Consumer<Either<MessagingException, Event>> getTerminateConsumer(MessageSource messageSource, ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate) {
        return either -> {
            moduleFlowProcessingPhaseTemplate.sendAfterTerminateResponseToClient(either.mapLeft(messagingException -> {
                messagingException.setProcessedEvent(ExceptionUtils.createErrorEvent(messagingException.getEvent(), messageSource, messagingException, this.muleContext.getErrorTypeLocator()));
                return messagingException;
            }));
        };
    }

    private Function<Event, Mono<Void>> onSuccess(MessageSource messageSource, Event event, MessageProcessContext messageProcessContext, PhaseResultNotifier phaseResultNotifier, ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate, Consumer<Either<MessagingException, Event>> consumer) {
        return event2 -> {
            fireNotification(messageSource, event2, messageProcessContext.getFlowConstruct(), ConnectorMessageNotification.MESSAGE_RESPONSE);
            if (event2 == null) {
                event2 = emptyEvent(event);
            }
            try {
                Event event2 = event2;
                return Mono.from(moduleFlowProcessingPhaseTemplate.sendResponseToClient(event2, moduleFlowProcessingPhaseTemplate.getSuccessfulExecutionResponseParametersFunction().apply(event2), moduleFlowProcessingPhaseTemplate.getFailedExecutionResponseParametersFunction(), createResponseCompletationCallback(phaseResultNotifier))).doOnSuccess(r7 -> {
                    onTerminate(consumer, Either.right(event2));
                });
            } catch (Exception e) {
                return Mono.error(new SourceErrorException(event2, this.sourceResponseGenerateErrorType, e));
            }
        };
    }

    private Event emptyEvent(Event event) {
        return Event.builder(event).message(Message.of(null)).build();
    }

    private Processor createFlowExecutionProcessor(MessageSource messageSource, MessagingExceptionHandler messagingExceptionHandler, MessageProcessContext messageProcessContext, ModuleFlowProcessingPhaseTemplate moduleFlowProcessingPhaseTemplate) {
        return event -> {
            try {
                return TransactionalErrorHandlingExecutionTemplate.createMainExecutionTemplate(this.muleContext, messageProcessContext.getFlowConstruct(), messageProcessContext.getTransactionConfig().orElse(new MuleTransactionConfig()), messagingExceptionHandler).execute(() -> {
                    fireNotification(messageSource, event, messageProcessContext.getFlowConstruct(), ConnectorMessageNotification.MESSAGE_RECEIVED);
                    return moduleFlowProcessingPhaseTemplate.routeEvent(event);
                });
            } catch (MuleException e) {
                throw e;
            } catch (Exception e2) {
                throw new DefaultMuleException(e2);
            }
        };
    }

    private ResponseCompletionCallback createSendFailureResponseCompletationCallback(final PhaseResultNotifier phaseResultNotifier, final ErrorType errorType) {
        return new ResponseCompletionCallback() { // from class: org.mule.runtime.core.execution.ModuleFlowProcessingPhase.1
            @Override // org.mule.runtime.core.execution.ResponseCompletionCallback
            public void responseSentSuccessfully() {
                phaseResultNotifier.phaseSuccessfully();
            }

            @Override // org.mule.runtime.core.execution.ResponseCompletionCallback
            public Event responseSentWithFailure(MessagingException messagingException, Event event) {
                if (errorType == null) {
                    ModuleFlowProcessingPhase.LOGGER.error("Unhandled exception processing request", (Throwable) messagingException);
                    return event;
                }
                Event build = Event.builder(event).error(ErrorBuilder.builder(messagingException.getCause()).errorType(errorType).build()).build();
                phaseResultNotifier.phaseFailure((Exception) messagingException.getCause());
                throw new SourceErrorException(build, errorType, messagingException.getCause());
            }
        };
    }

    private ResponseCompletionCallback createResponseCompletationCallback(final PhaseResultNotifier phaseResultNotifier) {
        return new ResponseCompletionCallback() { // from class: org.mule.runtime.core.execution.ModuleFlowProcessingPhase.2
            @Override // org.mule.runtime.core.execution.ResponseCompletionCallback
            public void responseSentSuccessfully() {
                phaseResultNotifier.phaseSuccessfully();
            }

            @Override // org.mule.runtime.core.execution.ResponseCompletionCallback
            public Event responseSentWithFailure(MessagingException messagingException, Event event) {
                if ((messagingException.getCause() instanceof SourceErrorException) && ModuleFlowProcessingPhase.this.sourceResponseSendErrorType.equals(((SourceErrorException) messagingException.getCause()).getErrorType())) {
                    throw ((SourceErrorException) messagingException.getCause());
                }
                throw new SourceErrorException(event, ModuleFlowProcessingPhase.this.sourceResponseSendErrorType, messagingException.getCause());
            }
        };
    }

    private void onTerminate(Consumer<Either<MessagingException, Event>> consumer, Either<Throwable, Event> either) {
        FunctionalUtils.safely(() -> {
            consumer.accept(either.mapLeft(th -> {
                if (th instanceof MessagingException) {
                    return (MessagingException) th;
                }
                if (th instanceof SourceErrorException) {
                    return ((SourceErrorException) th).toMessagingException();
                }
                return null;
            }));
        });
    }

    @Override // java.lang.Comparable
    public int compareTo(MessageProcessPhase messageProcessPhase) {
        return messageProcessPhase instanceof ValidationPhase ? 1 : 0;
    }
}
