package org.mule.runtime.module.extension.internal.runtime.source;

import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.inject.Inject;
import javax.transaction.TransactionManager;
import org.mule.runtime.api.cluster.ClusterService;
import org.mule.runtime.api.config.FeatureFlaggingService;
import org.mule.runtime.api.config.MuleRuntimeFeature;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.api.meta.model.ExtensionModel;
import org.mule.runtime.api.meta.model.source.SourceModel;
import org.mule.runtime.api.notification.NotificationDispatcher;
import org.mule.runtime.api.notification.NotificationListenerRegistry;
import org.mule.runtime.api.profiling.ProfilingService;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.tx.TransactionType;
import org.mule.runtime.api.util.LazyValue;
import org.mule.runtime.api.util.NameUtils;
import org.mule.runtime.core.api.config.MuleConfiguration;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.el.ExpressionManager;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.exception.SystemExceptionHandler;
import org.mule.runtime.core.api.extension.ExtensionManager;
import org.mule.runtime.core.api.lifecycle.LifecycleState;
import org.mule.runtime.core.api.lifecycle.LifecycleStateEnabled;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.lifecycle.PrimaryNodeLifecycleNotificationListener;
import org.mule.runtime.core.api.management.stats.AllStatistics;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.retry.ReconnectionConfig;
import org.mule.runtime.core.api.retry.policy.RetryPolicyExhaustedException;
import org.mule.runtime.core.api.retry.policy.RetryPolicyTemplate;
import org.mule.runtime.core.api.rx.Exceptions;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.streaming.CursorProviderFactory;
import org.mule.runtime.core.api.transaction.MuleTransactionConfig;
import org.mule.runtime.core.api.transaction.TransactionConfig;
import org.mule.runtime.core.api.transaction.TransactionFactory;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.api.util.SystemUtils;
import org.mule.runtime.core.api.util.func.CheckedRunnable;
import org.mule.runtime.core.internal.el.TemplateParser;
import org.mule.runtime.core.internal.exception.MessagingExceptionResolver;
import org.mule.runtime.core.internal.execution.ExceptionCallback;
import org.mule.runtime.core.internal.execution.MessageProcessContext;
import org.mule.runtime.core.internal.execution.MessageProcessingManager;
import org.mule.runtime.core.internal.lifecycle.DefaultLifecycleManager;
import org.mule.runtime.core.internal.transaction.TransactionFactoryLocator;
import org.mule.runtime.core.privileged.exception.ErrorTypeLocator;
import org.mule.runtime.extension.api.runtime.config.ConfigurationInstance;
import org.mule.runtime.extension.api.runtime.config.ConfigurationProvider;
import org.mule.runtime.extension.api.runtime.config.ConfigurationStats;
import org.mule.runtime.extension.api.runtime.config.ConfiguredComponent;
import org.mule.runtime.extension.api.runtime.source.ParameterizedSource;
import org.mule.runtime.module.extension.api.runtime.resolver.ParameterValueResolver;
import org.mule.runtime.module.extension.api.runtime.resolver.ResolverSet;
import org.mule.runtime.module.extension.api.runtime.resolver.ValueResolvingContext;
import org.mule.runtime.module.extension.api.util.MuleExtensionUtils;
import org.mule.runtime.module.extension.internal.runtime.ExtensionComponent;
import org.mule.runtime.module.extension.internal.runtime.config.MutableConfigurationStats;
import org.mule.runtime.module.extension.internal.runtime.connectivity.oauth.ExtensionsOAuthUtils;
import org.mule.runtime.module.extension.internal.runtime.exception.ExceptionHandlerManager;
import org.mule.runtime.module.extension.internal.runtime.operation.IllegalSourceException;
import org.mule.runtime.module.extension.internal.runtime.resolver.ObjectBasedParameterValueResolver;
import org.mule.runtime.module.extension.internal.runtime.source.poll.RestartContext;
import org.mule.runtime.module.extension.internal.util.ReconnectionUtils;
import org.mule.runtime.module.extension.internal.util.ReflectionCache;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/mule/runtime/module/extension/internal/runtime/source/ExtensionMessageSource.class */
public class ExtensionMessageSource extends ExtensionComponent<SourceModel> implements MessageSource, ExceptionCallback<ConnectionException>, ParameterizedSource, ConfiguredComponent, LifecycleStateEnabled {
    private static final Logger LOGGER = LoggerFactory.getLogger(ExtensionMessageSource.class);

    @Inject
    private SchedulerService schedulerService;

    @Inject
    private NotificationListenerRegistry notificationListenerRegistry;

    @Inject
    private ReflectionCache reflectionCache;

    @Inject
    private TransactionFactoryLocator transactionFactoryLocator;

    @Inject
    private ExpressionManager expressionManager;

    @Inject
    private ClusterService clusterService;

    @Inject
    private FeatureFlaggingService featureFlaggingService;

    @Inject
    private ProfilingService profilingService;

    @Inject
    private Optional<TransactionManager> transactionManager;

    @Inject
    private MuleConfiguration configuration;
    private MessageProcessingManager messageProcessingManager;
    private final SourceModel sourceModel;
    private final SourceAdapterFactory sourceAdapterFactory;
    private final boolean primaryNodeOnly;
    private final RetryPolicyTemplate customRetryPolicyTemplate;
    private final MessageSource.BackPressureStrategy backPressureStrategy;
    private final ExceptionHandlerManager exceptionEnricherManager;
    private final AtomicBoolean reconnecting;
    private final DefaultLifecycleManager<ExtensionMessageSource> lifecycleManager;
    private final TemplateParser expressionParser;
    private final ConfigurationProvider explicitConfigProvider;
    private SourceConnectionManager sourceConnectionManager;
    private Processor messageProcessor;
    private final LazyValue<TransactionConfig> transactionConfig;
    private SourceAdapter sourceAdapter;
    private RetryPolicyTemplate retryPolicyTemplate;
    private Scheduler retryScheduler;
    private LazyValue<FlowConstruct> flowConstruct;
    private MessageProcessContext messageProcessContext;
    private final NotificationDispatcher notificationDispatcher;
    private final String applicationName;
    private final AtomicBoolean started;

    public ExtensionMessageSource(ExtensionModel extensionModel, SourceModel sourceModel, SourceAdapterFactory sourceAdapterFactory, ConfigurationProvider configurationProvider, boolean z, RetryPolicyTemplate retryPolicyTemplate, CursorProviderFactory cursorProviderFactory, MessageSource.BackPressureStrategy backPressureStrategy, ExtensionManager extensionManager, NotificationDispatcher notificationDispatcher, String str) {
        super(extensionModel, sourceModel, configurationProvider, cursorProviderFactory, extensionManager);
        this.reconnecting = new AtomicBoolean(false);
        this.expressionParser = TemplateParser.createMuleStyleParser();
        this.transactionConfig = new LazyValue<>(this::buildTransactionConfig);
        this.started = new AtomicBoolean(false);
        this.sourceModel = sourceModel;
        this.sourceAdapterFactory = sourceAdapterFactory;
        this.customRetryPolicyTemplate = retryPolicyTemplate;
        this.primaryNodeOnly = z;
        this.backPressureStrategy = backPressureStrategy;
        this.notificationDispatcher = notificationDispatcher;
        this.applicationName = str;
        this.exceptionEnricherManager = new ExceptionHandlerManager(extensionModel, sourceModel);
        this.lifecycleManager = new DefaultLifecycleManager<>(sourceModel.getName(), this);
        this.explicitConfigProvider = configurationProvider;
    }

    private synchronized void createSource(boolean z) throws Exception {
        if (this.sourceAdapter == null) {
            CoreEvent coreEvent = null;
            try {
                coreEvent = MuleExtensionUtils.getInitialiserEvent(this.muleContext);
                this.sourceAdapter = this.sourceAdapterFactory.createAdapter(startUsingConfiguration(coreEvent), createSourceCallbackFactory(), this, this.sourceConnectionManager, z);
                this.muleContext.getInjector().inject(this.sourceAdapter);
                this.retryPolicyTemplate = createRetryPolicyTemplate(this.customRetryPolicyTemplate);
                LifecycleUtils.initialiseIfNeeded(this.retryPolicyTemplate, true, this.muleContext);
                if (coreEvent != null) {
                    coreEvent.getContext().success();
                }
            } catch (Throwable th) {
                if (coreEvent != null) {
                    coreEvent.getContext().success();
                }
                throw th;
            }
        }
    }

    private void startSource(boolean z, RestartContext restartContext) throws MuleException {
        Runnable runnable;
        Consumer consumer;
        SystemExceptionHandler exceptionListener = this.muleContext.getExceptionListener();
        if (this.retryPolicyTemplate.isAsync()) {
            runnable = getSuccessRunnable(z);
            consumer = th -> {
                RetryPolicyExhaustedException retryPolicyExhaustedException = th instanceof RetryPolicyExhaustedException ? (RetryPolicyExhaustedException) th : new RetryPolicyExhaustedException(th, this);
                exceptionListener.handleException(retryPolicyExhaustedException);
                onReconnectionFailed(retryPolicyExhaustedException);
            };
        } else {
            runnable = () -> {
            };
            consumer = th2 -> {
            };
        }
        Consumer consumer2 = consumer;
        Runnable runnable2 = runnable;
        CompletableFuture whenComplete = this.retryPolicyTemplate.applyPolicy(() -> {
            CompletableFuture completableFuture = new CompletableFuture();
            this.retryScheduler.execute(() -> {
                if (!this.retryPolicyTemplate.isAsync()) {
                    doWork(z, restartContext, completableFuture);
                    return;
                }
                synchronized (this.lifecycleManager) {
                    synchronized (this.started) {
                        if (this.started.get()) {
                            doWork(z, restartContext, completableFuture);
                        } else {
                            completableFuture.complete(null);
                        }
                    }
                }
            });
            return completableFuture;
        }, th3 -> {
            return true;
        }, th4 -> {
            computeStats();
        }, ReconnectionUtils.NULL_THROWABLE_CONSUMER, Function.identity(), this.retryScheduler).whenComplete((r5, th5) -> {
            if (th5 != null) {
                consumer2.accept(th5);
            } else {
                runnable2.run();
            }
        });
        try {
            if (!this.retryPolicyTemplate.isAsync()) {
                whenComplete.get();
                getSuccessRunnable(z).run();
            }
        } catch (InterruptedException e) {
            MuleRuntimeException muleRuntimeException = new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Found exception starting source '%s' on flow '%s'", this.sourceModel.getName(), getLocation().getRootContainerName())), e);
            exceptionListener.handleException(muleRuntimeException);
            throw muleRuntimeException;
        } catch (ExecutionException e2) {
            RetryPolicyExhaustedException retryPolicyExhaustedException = new RetryPolicyExhaustedException(e2.getCause(), this);
            exceptionListener.handleException(retryPolicyExhaustedException);
            throw retryPolicyExhaustedException;
        }
    }

    private void startSource() throws MuleException {
        startSource(false, null);
    }

    private RetryPolicyTemplate createRetryPolicyTemplate(RetryPolicyTemplate retryPolicyTemplate) {
        return (RetryPolicyTemplate) getConfigurationInstance().map(configurationInstance -> {
            return (ConnectionProvider) configurationInstance.getConnectionProvider().orElse(null);
        }).map(connectionProvider -> {
            return this.connectionManager.getReconnectionConfigFor(connectionProvider).getRetryPolicyTemplate(retryPolicyTemplate);
        }).orElseGet(() -> {
            return retryPolicyTemplate != null ? retryPolicyTemplate : ReconnectionConfig.defaultReconnectionConfig().getRetryPolicyTemplate();
        });
    }

    private void stopSource() throws MuleException {
        stopSource(false);
    }

    private RestartContext stopSource(boolean z) throws MuleException {
        if (this.sourceAdapter == null) {
            return null;
        }
        String name = this.sourceAdapter.getName();
        CoreEvent coreEvent = null;
        try {
            try {
                CoreEvent initialiserEvent = MuleExtensionUtils.getInitialiserEvent(this.muleContext);
                try {
                    stopUsingConfiguration(initialiserEvent);
                    RestartContext beginRestart = z ? this.sourceAdapter.beginRestart() : null;
                    if (initialiserEvent != null) {
                        initialiserEvent.getContext().success();
                    }
                    return beginRestart;
                } finally {
                    this.sourceAdapter.stop();
                    if (usesDynamicConfiguration()) {
                        disposeSource();
                    }
                }
            } catch (Exception e) {
                throw new DefaultMuleException(String.format("Found exception stopping source '%s' of root component '%s'", name, getLocation().getRootContainerName()), e);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                coreEvent.getContext().success();
            }
            throw th;
        }
    }

    public Optional<ConfigurationInstance> getConfigurationInstance() {
        return this.sourceAdapter.getConfigurationInstance();
    }

    private SourceCallbackFactory createSourceCallbackFactory() {
        return sourceCompletionHandlerFactory -> {
            return DefaultSourceCallback.builder(this.profilingService).setExceptionCallback(this).setSourceModel(this.sourceModel).setConfigurationInstance(getConfigurationInstance().orElse(null)).setTransactionConfig((TransactionConfig) this.transactionConfig.get()).setSource(this).setDefaultEncoding(SystemUtils.getDefaultEncoding(this.configuration)).setTransactionManager(this.transactionManager.orElse(null)).setListener(this.messageProcessor).setProcessingManager(this.messageProcessingManager).setProcessContext(this.messageProcessContext).setApplicationName(this.applicationName).setNotificationDispatcher(this.notificationDispatcher).setCursorStreamProviderFactory(getCursorProviderFactory()).setCompletionHandlerFactory(sourceCompletionHandlerFactory).setErrorAfterTimeout(this.featureFlaggingService.isEnabled(MuleRuntimeFeature.ERROR_AND_ROLLBACK_TX_WHEN_TIMEOUT)).build();
        };
    }

    public void onException(ConnectionException connectionException) {
        if (!this.reconnecting.compareAndSet(false, true)) {
            LOGGER.error(String.format("Message source '%s' on flow '%s' found connection error but reconnection is already in progress. Error was: %s", this.sourceModel.getName(), getLocation().getRootContainerName(), connectionException.getMessage()), connectionException);
            return;
        }
        this.muleContext.getExceptionListener().handleException(connectionException, getLocation());
        if (LOGGER.isWarnEnabled()) {
            LOGGER.warn(String.format("Message source '%s' on flow '%s' threw exception. Attempting to reconnect...", this.sourceAdapter.getName(), getLocation().getRootContainerName()), connectionException);
        }
        try {
            ExtensionsOAuthUtils.refreshTokenIfNecessary((ConnectionProvider) getConfigurationInstance().flatMap(configurationInstance -> {
                return configurationInstance.getConnectionProvider();
            }).orElse(null), (Throwable) connectionException);
        } catch (Exception e) {
            e = e;
            if (!(e instanceof MuleException)) {
                e = new DefaultMuleException(e);
            }
            if (LOGGER.isErrorEnabled()) {
                LOGGER.error(String.format("Message source '%s' on flow '%s' threw exception while trying to refresh OAuth access token: %s", this.sourceAdapter.getName(), getLocation().getRootContainerName(), connectionException.getMessage()), connectionException);
            }
            this.muleContext.getExceptionListener().handleException(e, getLocation());
        }
        Optional<Publisher<Void>> reconnectionAction = this.sourceAdapter.getReconnectionAction(connectionException);
        if (reconnectionAction.isPresent()) {
            this.retryScheduler.execute(() -> {
                ((Mono) reconnectionAction.map(publisher -> {
                    return Mono.from(this.retryPolicyTemplate.applyPolicy(publisher, this.retryScheduler));
                }).get()).doOnSuccess(r3 -> {
                    onReconnectionSuccessful();
                }).doOnError(this::onReconnectionFailed).subscribe();
            });
        } else {
            invalidateConnection(connectionException);
            this.retryScheduler.execute(() -> {
                try {
                    restart();
                } catch (MuleException e2) {
                    onReconnectionFailed(e2);
                }
            });
        }
    }

    private Runnable getSuccessRunnable(boolean z) {
        return z ? this::onReconnectionSuccessful : this::onStartSuccessful;
    }

    private void onReconnectionSuccessful() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Message source '{}' on flow '{}' successfully reconnected", this.sourceModel.getName(), getLocation().getRootContainerName());
        }
        this.reconnecting.set(false);
    }

    private void onStartSuccessful() {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("Message source '{}' on flow '{}' successfully started", this.sourceModel.getName(), getLocation().getRootContainerName());
        }
        this.reconnecting.set(false);
    }

    private void onReconnectionFailed(Throwable th) {
        LOGGER.error(String.format("Message source '%s' on flow '%s' could not be reconnected. Will be shutdown. %s", this.sourceModel.getName(), getLocation().getRootContainerName(), th.getMessage()), th);
        shutdown();
        this.reconnecting.set(false);
    }

    private void restart() throws MuleException {
        synchronized (this.started) {
            if (this.started.get()) {
                RestartContext stopSource = stopSource(true);
                disposeSource();
                startSource(true, stopSource);
            } else {
                LOGGER.warn(String.format("Message source '%s' on flow '%s' is stopped. Not doing restart", getLocation().getRootContainerName(), getLocation().getRootContainerName()));
            }
        }
    }

    @Override // org.mule.runtime.module.extension.internal.runtime.ExtensionComponent
    public void doStart() throws MuleException {
        if (shouldRunOnThisNode()) {
            reallyDoStart();
        }
    }

    private void reallyDoStart() throws MuleException {
        LOGGER.debug("Message source '{}' on flow '{}' is starting", this.sourceModel.getName(), getLocation().getRootContainerName());
        lifecycle(() -> {
            this.lifecycleManager.fireStartPhase((str, extensionMessageSource) -> {
                LifecycleUtils.startIfNeeded(this.retryPolicyTemplate);
                if (this.retryScheduler == null) {
                    this.retryScheduler = this.schedulerService.ioScheduler();
                }
                synchronized (this.started) {
                    startSource();
                    this.started.set(true);
                }
            });
        });
    }

    @Override // org.mule.runtime.module.extension.internal.runtime.ExtensionComponent
    public void doStop() throws MuleException {
        LOGGER.debug("Message source '{}' on flow '{}' is stopping", this.sourceModel.getName(), getLocation().getRootContainerName());
        safeLifecycle(() -> {
            this.lifecycleManager.fireStopPhase((str, extensionMessageSource) -> {
                synchronized (this.started) {
                    if (this.started.compareAndSet(true, false)) {
                        stopSource();
                    }
                }
                stopSchedulers();
            });
        });
    }

    @Override // org.mule.runtime.module.extension.internal.runtime.ExtensionComponent
    public void doDispose() {
        try {
            safeLifecycle(() -> {
                this.lifecycleManager.fireDisposePhase((str, extensionMessageSource) -> {
                    disposeSource();
                    LifecycleUtils.stopIfNeeded(this.retryPolicyTemplate);
                    LifecycleUtils.disposeIfNeeded(this.retryPolicyTemplate, LOGGER);
                    stopSchedulers();
                });
            });
        } catch (MuleException e) {
            LOGGER.warn(String.format("Failed to dispose message source at root element '%s'. %s", getLocation().getRootContainerName(), e.getMessage()), e);
        }
    }

    private void lifecycle(CheckedRunnable checkedRunnable) throws MuleException {
        try {
            checkedRunnable.run();
        } catch (Throwable th) {
            handleLifecycleException(th, false);
        }
    }

    private void safeLifecycle(CheckedRunnable checkedRunnable) throws MuleException {
        try {
            checkedRunnable.run();
        } catch (Throwable th) {
            handleLifecycleException(th, true);
        }
    }

    private void handleLifecycleException(Throwable th, boolean z) throws MuleException {
        Throwable unwrap = Exceptions.unwrap(th);
        if (z && (unwrap instanceof LifecycleException) && unwrap.getCause() != null) {
            unwrap = unwrap.getCause();
        }
        if (!(unwrap instanceof IllegalStateException)) {
            if (!(unwrap instanceof MuleException)) {
                throw new DefaultMuleException(unwrap);
            }
            throw ((MuleException) unwrap);
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Skipping lifecycle phase: " + unwrap.getMessage(), unwrap);
        }
    }

    private void shutdown() {
        try {
            LifecycleUtils.stopIfNeeded(this);
        } catch (Exception e) {
            LOGGER.error(String.format("Failed to stop source '%s' on flow '%s'", this.sourceAdapter.getName(), getLocation().getRootContainerName()), e);
        }
        LifecycleUtils.disposeIfNeeded(this, LOGGER);
    }

    private void stopSchedulers() {
        if (this.retryScheduler != null) {
            try {
                this.retryScheduler.stop();
            } finally {
                this.retryScheduler = null;
            }
        }
    }

    private void disposeSource() {
        LifecycleUtils.disposeIfNeeded(this.sourceAdapter, LOGGER);
        this.sourceAdapter = null;
    }

    private TransactionConfig buildTransactionConfig() {
        MuleTransactionConfig muleTransactionConfig = new MuleTransactionConfig();
        muleTransactionConfig.setAction(org.mule.runtime.module.extension.internal.util.MuleExtensionUtils.toActionCode(this.sourceAdapter.getTransactionalAction()));
        muleTransactionConfig.setMuleContext(this.muleContext);
        TransactionType transactionalType = this.sourceAdapter.getTransactionalType();
        muleTransactionConfig.setFactory((TransactionFactory) this.transactionFactoryLocator.lookUpTransactionFactory(transactionalType).orElseThrow(() -> {
            return new IllegalStateException(String.format("Unable to create Source with Transactions of Type: [%s]. No factory available for this transaction type", transactionalType));
        }));
        return muleTransactionConfig;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SourceConnectionManager getSourceConnectionManager() {
        return this.sourceConnectionManager;
    }

    private MessageProcessContext createProcessingContext() {
        return new MessageProcessContext() { // from class: org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSource.1
            private final MessagingExceptionResolver messagingExceptionResolver = new MessagingExceptionResolver(getMessageSource());

            public MessageSource getMessageSource() {
                return ExtensionMessageSource.this;
            }

            public Optional<TransactionConfig> getTransactionConfig() {
                return ExtensionMessageSource.this.sourceModel.isTransactional() ? Optional.of((TransactionConfig) ExtensionMessageSource.this.transactionConfig.get()) : Optional.empty();
            }

            public ClassLoader getExecutionClassLoader() {
                return ExtensionMessageSource.this.muleContext.getExecutionClassLoader();
            }

            public ErrorTypeLocator getErrorTypeLocator() {
                return ExtensionMessageSource.this.muleContext.getErrorTypeLocator();
            }

            public MessagingExceptionResolver getMessagingExceptionResolver() {
                return this.messagingExceptionResolver;
            }

            public FlowConstruct getFlowConstruct() {
                return (FlowConstruct) ExtensionMessageSource.this.flowConstruct.get();
            }
        };
    }

    private void doWork(boolean z, RestartContext restartContext, CompletableFuture<Void> completableFuture) {
        try {
            createSource(z);
            LifecycleUtils.initialiseIfNeeded(this.sourceAdapter);
            if (z) {
                this.sourceAdapter.finishRestart(restartContext);
            }
            this.sourceAdapter.start();
            completableFuture.complete(null);
        } catch (Exception e) {
            try {
                ExceptionUtils.extractConnectionException(e).ifPresent(connectionException -> {
                    invalidateConnection(connectionException);
                });
                stopSource();
            } catch (Exception e2) {
                e.addSuppressed(e2);
            }
            try {
                disposeSource();
            } catch (Exception e3) {
                e.addSuppressed(e3);
            }
            Throwable process = this.exceptionEnricherManager.process(e);
            Optional extractConnectionException = ExceptionUtils.extractConnectionException(process);
            if (extractConnectionException.isPresent()) {
                process = (Throwable) extractConnectionException.get();
            }
            completableFuture.completeExceptionally(process instanceof Exception ? (Exception) process : new MuleRuntimeException(process));
        }
    }

    private void computeStats() {
        AllStatistics statistics = this.muleContext.getStatistics();
        if (statistics != null && statistics.isEnabled() && computeConnectionErrorsInStats()) {
            statistics.getApplicationStatistics().incConnectionErrors();
        }
    }

    private boolean computeConnectionErrorsInStats() {
        return this.featureFlaggingService.isEnabled(MuleRuntimeFeature.COMPUTE_CONNECTION_ERRORS_IN_STATS);
    }

    public void setListener(Processor processor) {
        this.messageProcessor = processor;
    }

    @Inject
    public void setMessageProcessingManager(MessageProcessingManager messageProcessingManager) {
        this.messageProcessingManager = messageProcessingManager;
    }

    @Override // org.mule.runtime.module.extension.internal.runtime.ExtensionComponent
    protected void validateOperationConfiguration(ConfigurationProvider configurationProvider) {
        if (!configurationProvider.getConfigurationModel().getSourceModel(this.sourceModel.getName()).isPresent() && !configurationProvider.getExtensionModel().getSourceModel(this.sourceModel.getName()).isPresent()) {
            throw new IllegalSourceException(String.format("Root component '%s' defines an usage of operation '%s' which points to configuration '%s'. The selected config does not support that operation.", getLocation().getRootContainerName(), this.sourceModel.getName(), configurationProvider.getName()));
        }
    }

    @Override // org.mule.runtime.module.extension.internal.runtime.ExtensionComponent
    protected ParameterValueResolver getParameterValueResolver() {
        return new ObjectBasedParameterValueResolver(this.sourceAdapter.getDelegate(), this.sourceModel, this.reflectionCache);
    }

    @Override // org.mule.runtime.module.extension.internal.runtime.ExtensionComponent
    protected void doInitialise() throws InitialisationException {
        validateConfigurationProviderIsNotExpression();
        this.flowConstruct = new LazyValue<>(() -> {
            return (FlowConstruct) this.componentLocator.find(getRootContainerLocation()).orElse(null);
        });
        this.messageProcessContext = createProcessingContext();
        if (!shouldRunOnThisNode()) {
            LOGGER.debug("Message source '{}' on flow '{}' cannot initialize. This Message source can only run on the primary node of the cluster", this.sourceModel.getName(), getLocation().getRootContainerName());
            new PrimaryNodeLifecycleNotificationListener(() -> {
                LOGGER.debug("Message source '{}' on flow '{}' is initializing because the node became cluster's primary.", this.sourceModel.getName(), getLocation().getRootContainerName());
                reallyDoInitialise();
                reallyDoStart();
            }, this.notificationListenerRegistry).register();
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            boolean isPrimaryPollingInstance = this.clusterService.isPrimaryPollingInstance();
            if (this.primaryNodeOnly) {
                LOGGER.debug("Message source '{}' on flow '{}' running on the primary node is initializing. Note that this Message source must run on the primary node only.", this.sourceModel.getName(), getLocation().getRootContainerName());
            } else {
                Logger logger = LOGGER;
                Object[] objArr = new Object[3];
                objArr[0] = this.sourceModel.getName();
                objArr[1] = getLocation().getRootContainerName();
                objArr[2] = isPrimaryPollingInstance ? "is" : "is not";
                logger.debug("Message source '{}' on flow '{}' is initializing. This {} the primary node of the cluster.", objArr);
            }
        }
        reallyDoInitialise();
    }

    private void reallyDoInitialise() throws InitialisationException {
        try {
            lifecycle(() -> {
                this.lifecycleManager.fireInitialisePhase((str, extensionMessageSource) -> {
                    this.sourceConnectionManager = new SourceConnectionManager(this.connectionManager);
                    try {
                        createSource(false);
                        LifecycleUtils.initialiseIfNeeded(this.sourceAdapter);
                    } catch (Exception e) {
                        throw new InitialisationException(e, this);
                    }
                });
            });
        } catch (MuleException e) {
            if (!(e instanceof InitialisationException)) {
                throw new InitialisationException(e, this);
            }
            throw e;
        }
    }

    public Map<String, Object> getInitialisationParameters() {
        CoreEvent coreEvent = null;
        try {
            try {
                CoreEvent initialiserEvent = MuleExtensionUtils.getInitialiserEvent();
                ResolverSet sourceParameters = this.sourceAdapterFactory.getSourceParameters();
                ValueResolvingContext build = ValueResolvingContext.builder(initialiserEvent).withExpressionManager(this.expressionManager).withConfig(getConfigurationInstance()).build();
                try {
                    ImmutableMap copyOf = ImmutableMap.copyOf(org.mule.runtime.module.extension.internal.util.MuleExtensionUtils.toMap(sourceParameters, build));
                    if (build != null) {
                        build.close();
                    }
                    if (initialiserEvent != null) {
                        initialiserEvent.getContext().success();
                    }
                    return copyOf;
                } catch (Throwable th) {
                    if (build != null) {
                        try {
                            build.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (Exception e) {
                throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Could not resolve parameters message source at location '%s'", getLocation().toString()), new Object[]{e}));
            }
        } catch (Throwable th3) {
            if (0 != 0) {
                coreEvent.getContext().success();
            }
            throw th3;
        }
    }

    public MessageSource.BackPressureStrategy getBackPressureStrategy() {
        return this.backPressureStrategy;
    }

    public LifecycleState getLifecycleState() {
        return this.lifecycleManager.getState();
    }

    private boolean shouldRunOnThisNode() {
        if (this.primaryNodeOnly) {
            return this.clusterService.isPrimaryPollingInstance();
        }
        return true;
    }

    private Optional<ConfigurationInstance> startUsingConfiguration(CoreEvent coreEvent) {
        return getConfigurationAndTryToMutateStats(coreEvent, mutableConfigurationStats -> {
            mutableConfigurationStats.addRunningSource();
        });
    }

    private void stopUsingConfiguration(CoreEvent coreEvent) {
        getConfigurationAndTryToMutateStats(coreEvent, mutableConfigurationStats -> {
            mutableConfigurationStats.discountRunningSource();
        });
    }

    private Optional<ConfigurationInstance> getConfigurationAndTryToMutateStats(CoreEvent coreEvent, Consumer<MutableConfigurationStats> consumer) {
        Optional<ConfigurationInstance> configuration = getConfiguration(coreEvent);
        configuration.ifPresent(configurationInstance -> {
            ConfigurationStats statistics = configurationInstance.getStatistics();
            if (statistics instanceof MutableConfigurationStats) {
                consumer.accept((MutableConfigurationStats) statistics);
            }
        });
        return configuration;
    }

    public String toString() {
        return getClass().getSimpleName() + ": " + Objects.toString(this.sourceAdapter);
    }

    boolean isReconnecting() {
        return this.reconnecting.get();
    }

    private void invalidateConnection(ConnectionException connectionException) {
        Optional connection = connectionException.getConnection();
        SourceConnectionManager sourceConnectionManager = this.sourceConnectionManager;
        Objects.requireNonNull(sourceConnectionManager);
        connection.ifPresent(sourceConnectionManager::invalidate);
    }

    private void validateConfigurationProviderIsNotExpression() throws InitialisationException {
        if (this.explicitConfigProvider != null && this.expressionParser.isContainsTemplate(this.explicitConfigProvider.getName())) {
            throw new InitialisationException(I18nMessageFactory.createStaticMessage(String.format("Root component '%s' defines component '%s' which specifies the expression '%s' as a config-ref. Expressions are not allowed as config references", getLocation().getRootContainerName(), NameUtils.hyphenize(this.componentModel.getName()), this.explicitConfigProvider)), this);
        }
    }
}
