package org.mule.runtime.module.extension.internal.runtime.source;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.resource.spi.work.Work;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.assertj.core.api.ThrowableAssert;
import org.hamcrest.BaseMatcher;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.StringContains;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableCauseMatcher;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.mule.runtime.api.component.Component;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.connection.ConnectionProvider;
import org.mule.runtime.api.event.Event;
import org.mule.runtime.api.exception.DefaultMuleException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.notification.ExceptionNotification;
import org.mule.runtime.api.notification.ExceptionNotificationListener;
import org.mule.runtime.api.notification.NotificationListenerRegistry;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.Injector;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.retry.async.AsynchronousRetryTemplate;
import org.mule.runtime.core.api.retry.policy.RetryPolicyExhaustedException;
import org.mule.runtime.core.api.retry.policy.SimpleRetryPolicyTemplate;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.privileged.registry.RegistrationException;
import org.mule.runtime.extension.api.connectivity.oauth.AccessTokenExpiredException;
import org.mule.runtime.extension.api.runtime.config.ConfigurationProvider;
import org.mule.runtime.extension.api.runtime.exception.ExceptionHandler;
import org.mule.runtime.extension.api.runtime.source.Source;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.tck.probe.JUnitLambdaProbe;
import org.mule.tck.probe.PollingProber;
import org.mule.test.heisenberg.extension.exception.HeisenbergConnectionExceptionEnricher;
import org.mule.test.module.extension.internal.util.ExtensionsTestUtils;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/mule/runtime/module/extension/internal/runtime/source/ExtensionMessageSourceTestCase.class */
public class ExtensionMessageSourceTestCase extends AbstractExtensionMessageSourceTestCase {
    protected static final int TEST_TIMEOUT = 3000;
    protected static final int TEST_POLL_DELAY = 1000;
    protected String property;

    /* loaded from: input_file:org/mule/runtime/module/extension/internal/runtime/source/ExtensionMessageSourceTestCase$DummySource.class */
    private class DummySource extends Source {
        private final String metadataKey;

        DummySource(String str) {
            this.metadataKey = str;
        }

        public void onStart(SourceCallback sourceCallback) throws MuleException {
        }

        public void onStop() {
        }
    }

    @Parameterized.Parameters(name = "{0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{"primary node only sync", true, false}, new Object[]{"primary node only async", true, true}, new Object[]{"all nodes sync", false, false}, new Object[]{"all nodes async", false, true});
    }

    public ExtensionMessageSourceTestCase(String str, boolean z, boolean z2) {
        this.primaryNodeOnly = z;
        if (z2) {
            this.retryPolicyTemplate = new AsynchronousRetryTemplate(new SimpleRetryPolicyTemplate(0L, 2));
            return;
        }
        SimpleRetryPolicyTemplate simpleRetryPolicyTemplate = new SimpleRetryPolicyTemplate(0L, 2);
        simpleRetryPolicyTemplate.setNotificationFirer(this.notificationDispatcher);
        this.retryPolicyTemplate = simpleRetryPolicyTemplate;
    }

    protected void doSetUpBeforeMuleContextCreation() {
        this.property = System.setProperty("mule.compute.connection.errors.in.stats", "true");
    }

    @After
    public void restoreProperty() {
        System.clearProperty("mule.compute.connection.errors.in.stats");
    }

    @Test
    public void handleMessage() throws Exception {
        Mockito.reset(new SourceCallbackFactory[]{this.sourceCallbackFactory});
        Mockito.when(this.sourceCallbackFactory.createSourceCallback((SourceCompletionHandlerFactory) ArgumentMatchers.any())).thenReturn(this.sourceCallback);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Latch latch = new Latch();
        ((Source) Mockito.doAnswer(invocationOnMock -> {
            this.sourceCallback.handle(this.result);
            atomicBoolean.set(true);
            latch.release();
            return null;
        }).when(this.source)).onStart(this.sourceCallback);
        ((Scheduler) Mockito.doAnswer(invocationOnMock2 -> {
            ((Work) invocationOnMock2.getArguments()[0]).run();
            return null;
        }).when(this.cpuLightScheduler)).execute((Runnable) ArgumentMatchers.any());
        start();
        latch.await();
        Assert.assertThat(Boolean.valueOf(atomicBoolean.get()), CoreMatchers.is(true));
    }

    @Test
    public void handleExceptionAndRestart() throws Exception {
        start();
        this.messageSource.onException(new ConnectionException("ERROR"));
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            ((Source) Mockito.verify(this.source)).onStop();
            return true;
        }));
        ((Scheduler) Mockito.verify(this.ioScheduler, Mockito.never())).stop();
        ((Scheduler) Mockito.verify(this.cpuLightScheduler, Mockito.never())).stop();
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            ((Source) Mockito.verify(this.source, Mockito.times(2))).onStart(this.sourceCallback);
            return true;
        }));
    }

    @Test
    public void initialise() throws Exception {
        if (this.messageSource.getLifecycleState().isInitialised()) {
            return;
        }
        this.messageSource.initialise();
        ((Injector) Mockito.verify(muleContext.getInjector())).inject(this.source);
        ((Initialisable) Mockito.verify(this.source)).initialise();
        ((Source) Mockito.verify(this.source, Mockito.never())).onStart(this.sourceCallback);
    }

    @Test
    public void sourceIsInstantiatedOnce() throws Exception {
        initialise();
        start();
        ((SourceAdapterFactory) Mockito.verify(this.sourceAdapterFactory, Mockito.times(1))).createAdapter((Optional) ArgumentMatchers.any(), (SourceCallbackFactory) ArgumentMatchers.any(), (Component) ArgumentMatchers.any(), (SourceConnectionManager) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean());
    }

    @Test
    public void failToStart() throws Exception {
        ConnectionException connectionException = new ConnectionException("ERROR");
        ((Source) Mockito.doThrow(new Throwable[]{new DefaultMuleException(connectionException)}).when(this.source)).onStart((SourceCallback) ArgumentMatchers.any());
        if (this.retryPolicyTemplate.isAsync()) {
            ExpectedException expectedException = this.expectedException;
            ExpectedException.none();
        } else {
            this.expectedException.expect(CoreMatchers.is(CoreMatchers.instanceOf(RetryPolicyExhaustedException.class)));
            this.expectedException.expectCause(CoreMatchers.is(connectionException));
        }
        this.messageSource.initialise();
        this.messageSource.start();
    }

    @Test
    public void dispatchNotificationWhenFailToStart() throws Exception {
        Latch latch = new Latch();
        ArrayList arrayList = new ArrayList();
        registerNotificationListener(exceptionNotification -> {
            arrayList.add(exceptionNotification);
            latch.release();
        });
        ((Source) Mockito.doThrow(new Throwable[]{new DefaultMuleException(new ConnectionException("ERROR"))}).when(this.source)).onStart((SourceCallback) ArgumentMatchers.any());
        this.messageSource.initialise();
        try {
            this.messageSource.start();
            latch.await(5L, TimeUnit.SECONDS);
            Assert.assertThat(arrayList.get(0), CoreMatchers.is(CoreMatchers.instanceOf(ExceptionNotification.class)));
            Assert.assertThat(((ExceptionNotification) arrayList.get(0)).getException(), CoreMatchers.instanceOf(RetryPolicyExhaustedException.class));
        } catch (Exception e) {
            latch.await(5L, TimeUnit.SECONDS);
            Assert.assertThat(arrayList.get(0), CoreMatchers.is(CoreMatchers.instanceOf(ExceptionNotification.class)));
            Assert.assertThat(((ExceptionNotification) arrayList.get(0)).getException(), CoreMatchers.instanceOf(RetryPolicyExhaustedException.class));
        }
    }

    @Test
    public void failToStartAndStopFails() throws Exception {
        ConnectionException connectionException = new ConnectionException("ERROR");
        ((Source) Mockito.doThrow(new Throwable[]{new DefaultMuleException(connectionException)}).when(this.source)).onStart((SourceCallback) ArgumentMatchers.any());
        ((Source) Mockito.doThrow(new Throwable[]{new NullPointerException()}).when(this.source)).onStop();
        if (this.retryPolicyTemplate.isAsync()) {
            this.expectedException.expect(CoreMatchers.is(CoreMatchers.instanceOf(DefaultMuleException.class)));
        } else {
            this.expectedException.expect(CoreMatchers.is(CoreMatchers.instanceOf(RetryPolicyExhaustedException.class)));
            this.expectedException.expectCause(CoreMatchers.is(connectionException));
        }
        this.messageSource.initialise();
        this.messageSource.start();
    }

    @Test
    public void failWithConnectionExceptionWhenStartingAndGetRetryPolicyExhausted() throws Exception {
        this.messageSource.initialise();
        ConnectionException connectionException = new ConnectionException("ERROR");
        ((Source) Mockito.doThrow(new Throwable[]{new RuntimeException((Throwable) connectionException)}).when(this.source)).onStart(this.sourceCallback);
        ExtensionMessageSource extensionMessageSource = this.messageSource;
        extensionMessageSource.getClass();
        Throwable catchThrowable = ThrowableAssert.catchThrowable(extensionMessageSource::start);
        if (this.retryPolicyTemplate.isAsync()) {
            new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
                Assert.assertNull(catchThrowable);
                return true;
            }));
        } else {
            Assert.assertThat(catchThrowable, CoreMatchers.is(CoreMatchers.instanceOf(RetryPolicyExhaustedException.class)));
            Assert.assertThat(catchThrowable, CoreMatchers.is(exhaustedBecauseOf((Throwable) connectionException)));
        }
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            ((Source) Mockito.verify(this.source, Mockito.times(3))).onStart(this.sourceCallback);
            return true;
        }));
    }

    @Test
    public void failWithNonConnectionExceptionWhenStartingAndGetRetryPolicyExhausted() throws Exception {
        ((Source) Mockito.doThrow(new Throwable[]{new DefaultMuleException(new IOException("ERROR"))}).when(this.source)).onStart(this.sourceCallback);
        this.messageSource.initialise();
        ExtensionMessageSource extensionMessageSource = this.messageSource;
        extensionMessageSource.getClass();
        Throwable catchThrowable = ThrowableAssert.catchThrowable(extensionMessageSource::start);
        if (this.retryPolicyTemplate.isAsync()) {
            new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
                Assert.assertNull(catchThrowable);
                return true;
            }));
        } else {
            Assert.assertThat(catchThrowable, CoreMatchers.is(CoreMatchers.instanceOf(RetryPolicyExhaustedException.class)));
            Assert.assertThat(ExceptionUtils.getThrowables(catchThrowable), Matchers.hasItemInArray(CoreMatchers.instanceOf(IOException.class)));
        }
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            ((Source) Mockito.verify(this.source, Mockito.times(3))).onStart(this.sourceCallback);
            return true;
        }));
    }

    @Test
    public void failWithConnectionExceptionWhenStartingAndGetsReconnected() throws Exception {
        ((Source) Mockito.doThrow(new Throwable[]{new RuntimeException((Throwable) new ConnectionException("ERROR"))}).doThrow(new Throwable[]{new RuntimeException((Throwable) new ConnectionException("ERROR"))}).doNothing().when(this.source)).onStart(this.sourceCallback);
        this.messageSource.initialise();
        this.messageSource.start();
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            ((Source) Mockito.verify(this.source, Mockito.times(3))).onStart(this.sourceCallback);
            ((Source) Mockito.verify(this.source, Mockito.times(2))).onStop();
            return true;
        }));
    }

    @Test
    public void getBackPressureStrategy() {
        Assert.assertThat(this.messageSource.getBackPressureStrategy(), CoreMatchers.is(MessageSource.BackPressureStrategy.FAIL));
    }

    @Test
    public void failOnExceptionWithConnectionExceptionAndGetsReconnected() throws Exception {
        this.messageSource.initialise();
        this.messageSource.start();
        this.messageSource.onException(new ConnectionException("ERROR"));
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            ((Source) Mockito.verify(this.source, Mockito.times(2))).onStart(this.sourceCallback);
            ((Source) Mockito.verify(this.source, Mockito.times(1))).onStop();
            return true;
        }));
    }

    @Test
    public void failOnExceptionWithAccessTokenExpiredExceptionInConnectionExceptionAndGetsReconnected() throws Exception {
        this.messageSource.initialise();
        this.messageSource.start();
        Mockito.when(this.configurationInstance.getConnectionProvider()).thenReturn(Optional.of(Mockito.mock(ConnectionProvider.class)));
        this.messageSource.onException(new ConnectionException(new AccessTokenExpiredException("ERROR")));
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            ((Source) Mockito.verify(this.source, Mockito.times(2))).onStart(this.sourceCallback);
            ((Source) Mockito.verify(this.source, Mockito.times(1))).onStop();
            return true;
        }));
    }

    @Test
    public void startFailsWithRandomException() throws Exception {
        final RuntimeException runtimeException = new RuntimeException();
        ((Source) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.source)).onStart(this.sourceCallback);
        if (this.retryPolicyTemplate.isAsync()) {
            ExpectedException expectedException = this.expectedException;
            ExpectedException.none();
        } else {
            this.expectedException.expect(exhaustedBecauseOf((Matcher<Throwable>) new BaseMatcher<Throwable>() { // from class: org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSourceTestCase.1
                private final Matcher<Exception> exceptionMatcher;

                {
                    this.exceptionMatcher = ThrowableCauseMatcher.hasCause(CoreMatchers.sameInstance(runtimeException));
                }

                public boolean matches(Object obj) {
                    return this.exceptionMatcher.matches(obj);
                }

                public void describeTo(Description description) {
                    this.exceptionMatcher.describeTo(description);
                }
            }));
        }
        initialise();
        this.messageSource.start();
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            ((Source) Mockito.verify(this.source, Mockito.times(3))).onStart(this.sourceCallback);
            ((Source) Mockito.verify(this.source, Mockito.times(3))).onStop();
            return true;
        }));
    }

    @Test
    public void start() throws Exception {
        initialise();
        if (!this.messageSource.getLifecycleState().isStarted()) {
            this.messageSource.start();
        }
        Injector injector = muleContext.getInjector();
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            InOrder inOrder = Mockito.inOrder(new Object[]{injector, this.source});
            ((Injector) inOrder.verify(injector)).inject(this.source);
            ((Initialisable) inOrder.verify(this.source)).initialise();
            ((Source) inOrder.verify(this.source)).onStart(this.sourceCallback);
            return true;
        }));
    }

    @Test
    public void failedToCreateRetryScheduler() throws Exception {
        this.messageSource.initialise();
        RuntimeException runtimeException = new RuntimeException();
        ((SchedulerService) Mockito.doThrow(new Throwable[]{runtimeException}).when(muleContext.getSchedulerService())).ioScheduler();
        ExtensionMessageSource extensionMessageSource = this.messageSource;
        extensionMessageSource.getClass();
        Assert.assertThat(ThrowableAssert.catchThrowable(extensionMessageSource::start).getCause(), CoreMatchers.is(CoreMatchers.sameInstance(runtimeException)));
    }

    @Test
    public void stop() throws Exception {
        this.messageSource.initialise();
        this.messageSource.start();
        this.messageSource.stop();
        ((Source) Mockito.verify(this.source)).onStop();
    }

    @Test
    public void dispose() throws Exception {
        this.messageSource.initialise();
        this.messageSource.start();
        this.messageSource.stop();
        this.messageSource.dispose();
        ((Disposable) Mockito.verify(this.source)).dispose();
    }

    @Test
    public void enrichExceptionWithSourceExceptionEnricher() throws Exception {
        Mockito.when(this.enricherFactory.createHandler()).thenReturn(new HeisenbergConnectionExceptionEnricher());
        ExtensionsTestUtils.mockExceptionEnricher(this.sourceModel, this.enricherFactory);
        ExtensionsTestUtils.mockExceptionEnricher(this.sourceModel, this.enricherFactory);
        ExtensionMessageSource newExtensionMessageSourceInstance = getNewExtensionMessageSourceInstance();
        newExtensionMessageSourceInstance.initialise();
        ((Source) Mockito.doThrow(new Throwable[]{new RuntimeException("ERROR")}).when(this.source)).onStart(this.sourceCallback);
        newExtensionMessageSourceInstance.getClass();
        Throwable catchThrowable = ThrowableAssert.catchThrowable(newExtensionMessageSourceInstance::start);
        if (this.retryPolicyTemplate.isAsync()) {
            new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
                Assert.assertNull(catchThrowable);
                return true;
            }));
        } else {
            Assert.assertThat(Boolean.valueOf(org.mule.runtime.core.api.util.ExceptionUtils.containsType(catchThrowable, ConnectionException.class)), CoreMatchers.is(true));
            Assert.assertThat(catchThrowable.getMessage(), StringContains.containsString("Enriched Connection Exception: ERROR"));
        }
        newExtensionMessageSourceInstance.stop();
    }

    @Test
    public void enrichExceptionWithExtensionEnricher() throws Exception {
        ExceptionHandler exceptionHandler = (ExceptionHandler) Mockito.mock(ExceptionHandler.class);
        Mockito.when(exceptionHandler.enrichException((Exception) ArgumentMatchers.any(Exception.class))).thenReturn(new Exception("Enriched: ERROR"));
        Mockito.when(this.enricherFactory.createHandler()).thenReturn(exceptionHandler);
        ExtensionsTestUtils.mockExceptionEnricher(this.extensionModel, this.enricherFactory);
        ExtensionMessageSource newExtensionMessageSourceInstance = getNewExtensionMessageSourceInstance();
        newExtensionMessageSourceInstance.initialise();
        ((Source) Mockito.doThrow(new Throwable[]{new RuntimeException("ERROR")}).when(this.source)).onStart(this.sourceCallback);
        newExtensionMessageSourceInstance.getClass();
        Throwable catchThrowable = ThrowableAssert.catchThrowable(newExtensionMessageSourceInstance::start);
        if (this.retryPolicyTemplate.isAsync()) {
            new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
                Assert.assertNull(catchThrowable);
                return true;
            }));
        } else {
            Assert.assertThat(catchThrowable.getMessage(), StringContains.containsString("Enriched: ERROR"));
        }
        newExtensionMessageSourceInstance.stop();
    }

    @Test
    public void workManagerDisposedIfSourceFailsToStop() throws Exception {
        start();
        final RuntimeException runtimeException = new RuntimeException();
        ((Source) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.source)).onStop();
        this.expectedException.expect(new BaseMatcher<Throwable>() { // from class: org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSourceTestCase.2
            public boolean matches(Object obj) {
                Exception exc = (Exception) obj;
                return (exc.getCause() instanceof MuleException) && exc.getCause().getCause() == runtimeException;
            }

            public void describeTo(Description description) {
                description.appendText("Exception was not wrapped as expected");
            }
        });
    }

    @Test
    public void actualSourceStoppedIfMessageSourceFailsToStop() throws Exception {
        Mockito.when(Boolean.valueOf(this.configurationProvider.isDynamic())).thenReturn(true);
        start();
        RuntimeException runtimeException = new RuntimeException();
        ((ConfigurationProvider) Mockito.doThrow(new Throwable[]{runtimeException}).when(this.configurationProvider)).get((Event) ArgumentMatchers.any(CoreEvent.class));
        this.expectedException.expectCause(CoreMatchers.sameInstance(runtimeException));
        try {
            this.messageSource.stop();
        } finally {
            ((Source) Mockito.verify(this.source)).onStop();
        }
    }

    @Test
    public void reconnectTwice() throws Exception {
        start();
        this.messageSource.onException(new ConnectionException("ERROR"));
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            return Boolean.valueOf(!this.messageSource.isReconnecting());
        }));
        ((Source) Mockito.verify(this.source, Mockito.times(2))).onStart(this.sourceCallback);
        ((Source) Mockito.verify(this.source, Mockito.times(1))).onStop();
        this.messageSource.onException(new ConnectionException("ERROR"));
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            return Boolean.valueOf(!this.messageSource.isReconnecting());
        }));
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            ((Source) Mockito.verify(this.source, Mockito.times(3))).onStart(this.sourceCallback);
            ((Source) Mockito.verify(this.source, Mockito.times(2))).onStop();
            return true;
        }));
    }

    @Test
    public void failToReconnect() throws Exception {
        start();
        Throwable connectionException = new ConnectionException("ERROR");
        ((Source) Mockito.doThrow(new Throwable[]{connectionException}).when(this.source)).onStart((SourceCallback) ArgumentMatchers.any());
        this.messageSource.onException(connectionException);
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            return Boolean.valueOf(!this.messageSource.isReconnecting());
        }));
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            ((Source) Mockito.verify(this.source, Mockito.times(4))).onStart(this.sourceCallback);
            ((Source) Mockito.verify(this.source, Mockito.times(4))).onStop();
            return true;
        }));
    }

    private BaseMatcher<Throwable> exhaustedBecauseOf(Throwable th) {
        return exhaustedBecauseOf(CoreMatchers.sameInstance(th));
    }

    private BaseMatcher<Throwable> exhaustedBecauseOf(final Matcher<Throwable> matcher) {
        return new BaseMatcher<Throwable>() { // from class: org.mule.runtime.module.extension.internal.runtime.source.ExtensionMessageSourceTestCase.3
            public boolean matches(Object obj) {
                return matcher.matches(((Throwable) obj).getCause());
            }

            public void describeTo(Description description) {
                matcher.describeTo(description);
            }
        };
    }

    @Test
    public void getMetadataKeyIdObjectValue() throws Exception {
        this.source = new DummySource("person");
        this.sourceAdapter = createSourceAdapter();
        Mockito.when(this.sourceAdapterFactory.createAdapter((Optional) ArgumentMatchers.any(), (SourceCallbackFactory) ArgumentMatchers.any(), (Component) ArgumentMatchers.any(), (SourceConnectionManager) ArgumentMatchers.any(), ArgumentMatchers.anyBoolean())).thenReturn(this.sourceAdapter);
        this.messageSource = getNewExtensionMessageSourceInstance();
        this.messageSource.initialise();
        this.messageSource.start();
        Assert.assertThat(this.messageSource.getParameterValueResolver().getParameterValue("metadataKey"), CoreMatchers.is("person"));
    }

    @Test
    public void getRetryPolicyExhaustedAndCnnectionErrorsAreComputed() throws Exception {
        muleContext.getStatistics().setEnabled(true);
        this.messageSource.initialise();
        ((Source) Mockito.doThrow(new Throwable[]{new RuntimeException((Throwable) new ConnectionException("ERROR"))}).when(this.source)).onStart(this.sourceCallback);
        ExtensionMessageSource extensionMessageSource = this.messageSource;
        extensionMessageSource.getClass();
        ThrowableAssert.catchThrowable(extensionMessageSource::start);
        new PollingProber(3000L, 1000L).check(new JUnitLambdaProbe(() -> {
            Assert.assertThat(Long.valueOf(muleContext.getStatistics().getApplicationStatistics().getConnectionErrors()), CoreMatchers.equalTo(2L));
            Assert.assertThat(Long.valueOf(muleContext.getStatistics().getApplicationStatistics().getExecutionErrors()), CoreMatchers.equalTo(2L));
            return true;
        }));
    }

    private void registerNotificationListener(ExceptionNotificationListener exceptionNotificationListener) throws RegistrationException {
        ((NotificationListenerRegistry) muleContext.getRegistry().lookupObject(NotificationListenerRegistry.class)).registerListener(exceptionNotificationListener);
    }
}
