package org.mule.runtime.core.internal.processor;

import io.qameta.allure.Feature;
import io.qameta.allure.Issue;
import io.qameta.allure.Stories;
import io.qameta.allure.Story;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.core.api.construct.BackPressureReason;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.construct.FromFlowRejectedExecutionException;
import org.mule.runtime.core.internal.processor.strategy.StreamPerEventSink;
import org.mule.tck.MuleTestUtils;
import org.mule.tck.SimpleUnitTestSupportSchedulerService;
import org.mule.tck.junit4.AbstractReactiveProcessorTestCase;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Feature("Routers")
@Stories({@Story("Async"), @Story("Backpressure")})
/* loaded from: input_file:org/mule/runtime/core/internal/processor/AsyncDelegateMessageProcessorBackPressureTestCase.class */
public class AsyncDelegateMessageProcessorBackPressureTestCase extends AbstractAsyncDelegateMessageProcessorTestCase {
    private FixingBackPressureSchedulerService service;

    /* loaded from: input_file:org/mule/runtime/core/internal/processor/AsyncDelegateMessageProcessorBackPressureTestCase$BackPressureGeneratorProcessingStrategy.class */
    private class BackPressureGeneratorProcessingStrategy implements ProcessingStrategy {
        private boolean backPressure;

        private BackPressureGeneratorProcessingStrategy() {
            this.backPressure = true;
        }

        public boolean isSynchronous() {
            return true;
        }

        public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor reactiveProcessor) {
            return new StreamPerEventSink(reactiveProcessor, coreEvent -> {
            });
        }

        public void checkBackpressureAccepting(CoreEvent coreEvent) throws RejectedExecutionException {
            if (this.backPressure) {
                throw new FromFlowRejectedExecutionException((BackPressureReason) null);
            }
        }

        public void setBackPressure(boolean z) {
            this.backPressure = z;
        }
    }

    /* loaded from: input_file:org/mule/runtime/core/internal/processor/AsyncDelegateMessageProcessorBackPressureTestCase$FixingBackPressureSchedulerService.class */
    private class FixingBackPressureSchedulerService extends SimpleUnitTestSupportSchedulerService {
        private final BackPressureGeneratorProcessingStrategy strategy;
        private final AtomicReference<Integer> executions = new AtomicReference<>(0);

        public FixingBackPressureSchedulerService(BackPressureGeneratorProcessingStrategy backPressureGeneratorProcessingStrategy) {
            this.strategy = backPressureGeneratorProcessingStrategy;
        }

        public int getExecutions() {
            return this.executions.get().intValue();
        }

        public Scheduler customScheduler(SchedulerConfig schedulerConfig) {
            Scheduler customScheduler = super.customScheduler(schedulerConfig);
            Scheduler scheduler = (Scheduler) Mockito.mock(Scheduler.class);
            ((Scheduler) Mockito.doAnswer(invocationOnMock -> {
                this.executions.getAndUpdate(num -> {
                    return Integer.valueOf(num.intValue() + 1);
                });
                return customScheduler.submit(() -> {
                    try {
                        Thread.sleep(1000L);
                    } catch (InterruptedException e) {
                    }
                    this.strategy.setBackPressure(false);
                    ((Runnable) invocationOnMock.getArgument(0)).run();
                });
            }).when(scheduler)).submit((Runnable) ArgumentMatchers.any(Runnable.class));
            return scheduler;
        }
    }

    public AsyncDelegateMessageProcessorBackPressureTestCase(AbstractReactiveProcessorTestCase.Mode mode) {
        super(mode);
    }

    protected void doSetUp() throws Exception {
        super.doSetUp();
        BackPressureGeneratorProcessingStrategy backPressureGeneratorProcessingStrategy = new BackPressureGeneratorProcessingStrategy();
        this.flow = MuleTestUtils.createAndRegisterFlow(muleContext, "appleFlow", this.componentLocator, (muleContext, str) -> {
            return backPressureGeneratorProcessingStrategy;
        });
        this.service = new FixingBackPressureSchedulerService(backPressureGeneratorProcessingStrategy);
        muleContext.getCustomizationService().registerCustomServiceImpl(muleContext.getSchedulerService().getName(), this.service);
        muleContext.getRegistry().registerObject(muleContext.getSchedulerService().getName(), this.service);
        this.async = createAsyncDelegateMessageProcessor(this.target, this.flow);
        this.async.start();
    }

    @Test
    @Issue("MULE-18431")
    public void processManyWithBackPressure() throws Exception {
        this.latch = new CountDownLatch(2);
        CoreEvent testEvent = testEvent();
        CoreEvent process = process(this.async, testEvent);
        CoreEvent process2 = process(this.async, testEvent);
        testEvent.getContext().success(process);
        testEvent.getContext().success(process2);
        MatcherAssert.assertThat(Boolean.valueOf(testEvent.getContext().isTerminated()), CoreMatchers.is(false));
        this.asyncEntryLatch.countDown();
        MatcherAssert.assertThat(Boolean.valueOf(this.latch.await(30000L, TimeUnit.MILLISECONDS)), CoreMatchers.is(true));
        while (!this.target.sensedEvent.getContext().isTerminated()) {
            park100ns();
        }
        MatcherAssert.assertThat(this.target.sensedEvent, CoreMatchers.notNullValue());
        while (!testEvent.getContext().isTerminated()) {
            park100ns();
        }
        MatcherAssert.assertThat(Boolean.valueOf(this.target.sensedEvent.getContext().isTerminated()), CoreMatchers.is(true));
        MatcherAssert.assertThat(Boolean.valueOf(testEvent.getContext().isTerminated()), CoreMatchers.is(true));
        assertTargetEvent(testEvent);
        assertResponse(process);
        assertResponse(process2);
        MatcherAssert.assertThat(Integer.valueOf(this.service.getExecutions()), CoreMatchers.is(1));
    }

    @Test
    @Issue("MULE-19091")
    public void streamPerEventSinkMonoFlagged() throws MuleException {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        new StreamPerEventSink(publisher -> {
            return Mono.subscriberContext().flatMapMany(context -> {
                return Flux.from(publisher).map(coreEvent -> {
                    atomicBoolean.set(((Boolean) context.getOrDefault("messageProcessors.withinProcessToApply", false)).booleanValue());
                    return coreEvent;
                });
            });
        }, coreEvent -> {
        }).accept(testEvent());
        MatcherAssert.assertThat(Boolean.valueOf(atomicBoolean.get()), CoreMatchers.is(true));
    }
}
