/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.module.extension.internal.runtime.operation;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.mule.runtime.api.event.Event;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.util.Reference;
import org.mule.runtime.api.util.concurrent.Latch;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.streaming.StreamingManager;
import org.mule.runtime.core.internal.message.EventInternalContext;
import org.mule.runtime.core.internal.message.InternalEvent;
import org.mule.runtime.core.privileged.exception.MessagingException;
import org.mule.runtime.core.privileged.processor.chain.MessageProcessorChain;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.module.extension.api.runtime.privileged.EventedResult;
import org.mule.runtime.module.extension.internal.runtime.execution.SdkInternalContext;
import org.mule.runtime.module.extension.internal.runtime.operation.ImmutableProcessorChainExecutor;
import org.mule.tck.junit4.AbstractMuleContextTestCase;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Mono;

public class ProcessorChainExecutorTestCase
extends AbstractMuleContextTestCase {
    @Rule
    public MockitoRule rule = MockitoJUnit.rule();
    @Mock(lenient=true)
    private MessageProcessorChain chain;
    @Mock
    private Processor processor;
    private CoreEvent coreEvent;
    private Latch latch;

    @Before
    public void setUp() throws Exception {
        this.coreEvent = this.testEvent();
        ((InternalEvent)this.coreEvent).setSdkInternalContext((EventInternalContext)new SdkInternalContext());
        Mockito.when((Object)this.chain.getLocation()).thenReturn(null);
        Mockito.when((Object)this.chain.apply((Publisher)ArgumentMatchers.any())).thenAnswer(inv -> Mono.from((Publisher)((Publisher)inv.getArgument(0))).map(event -> CoreEvent.builder((CoreEvent)event).message(this.coreEvent.getMessage()).variables(this.coreEvent.getVariables()).build()));
        Mockito.when((Object)this.chain.getMessageProcessors()).thenReturn(Collections.singletonList(this.processor));
    }

    @Test
    public void testDoProcessSuccessOnce() throws InterruptedException {
        ImmutableProcessorChainExecutor chainExecutor = new ImmutableProcessorChainExecutor((StreamingManager)Mockito.mock(StreamingManager.class), this.coreEvent, this.chain);
        AtomicInteger successCalls = new AtomicInteger(0);
        AtomicInteger errorCalls = new AtomicInteger(0);
        this.doProcessAndWait(chainExecutor, r -> successCalls.incrementAndGet(), (t, r) -> errorCalls.incrementAndGet());
        MatcherAssert.assertThat((Object)successCalls.get(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat((Object)errorCalls.get(), (Matcher)Matchers.is((Object)0));
    }

    @Test
    public void testDoProcessOnErrorMessagingException() throws InterruptedException, MuleException {
        String ERROR_PAYLOAD = "ERROR_PAYLOAD";
        ((MessageProcessorChain)Mockito.doReturn((Object)Mono.error((Throwable)new MessagingException(I18nMessageFactory.createStaticMessage((String)""), this.getEventBuilder().message(Message.of((Object)"ERROR_PAYLOAD")).build()))).when((Object)this.chain)).apply((Publisher)ArgumentMatchers.any());
        ImmutableProcessorChainExecutor chainExecutor = new ImmutableProcessorChainExecutor((StreamingManager)Mockito.mock(StreamingManager.class), this.coreEvent, this.chain);
        AtomicInteger successCalls = new AtomicInteger(0);
        AtomicInteger errorCalls = new AtomicInteger(0);
        Reference errorEvent = new Reference();
        this.doProcessAndWait(chainExecutor, r -> successCalls.incrementAndGet(), (t, r) -> {
            errorCalls.incrementAndGet();
            errorEvent.set((Object)((EventedResult)r).getEvent());
        });
        MatcherAssert.assertThat((Object)successCalls.get(), (Matcher)Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)errorCalls.get(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat((Object)((Event)errorEvent.get()).getMessage().getPayload().getValue(), (Matcher)Matchers.is((Object)"ERROR_PAYLOAD"));
    }

    @Test
    public void testDoProcessOnErrorGenericException() throws InterruptedException {
        ((MessageProcessorChain)Mockito.doReturn((Object)Mono.error((Throwable)new RuntimeException())).when((Object)this.chain)).apply((Publisher)ArgumentMatchers.any());
        ImmutableProcessorChainExecutor chainExecutor = new ImmutableProcessorChainExecutor((StreamingManager)Mockito.mock(StreamingManager.class), this.coreEvent, this.chain);
        AtomicInteger successCalls = new AtomicInteger(0);
        AtomicInteger errorCalls = new AtomicInteger(0);
        Reference errorEvent = new Reference();
        this.doProcessAndWait(chainExecutor, r -> successCalls.incrementAndGet(), (t, r) -> {
            errorCalls.incrementAndGet();
            errorEvent.set((Object)((EventedResult)r).getEvent());
        });
        MatcherAssert.assertThat((Object)successCalls.get(), (Matcher)Matchers.is((Object)0));
        MatcherAssert.assertThat((Object)errorCalls.get(), (Matcher)Matchers.is((Object)1));
        MatcherAssert.assertThat((Object)((Event)errorEvent.get()).getMessage().getPayload().getValue(), (Matcher)Matchers.is((Object)"test"));
    }

    private void doProcessAndWait(ImmutableProcessorChainExecutor chainExecutor, Consumer<Result> onSuccess, BiConsumer<Throwable, Result> onError) throws InterruptedException {
        this.latch = new Latch();
        chainExecutor.process(onSuccess, onError);
        this.latch.await(300L, TimeUnit.MILLISECONDS);
    }
}

