package org.mule.runtime.core.processor.strategy;

import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory;
import org.mule.runtime.core.api.scheduler.Scheduler;
import org.mule.runtime.core.processor.strategy.MultiReactorProcessingStrategyFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/mule/runtime/core/processor/strategy/ProactorProcessingStrategyFactory.class */
public class ProactorProcessingStrategyFactory implements ProcessingStrategyFactory {

    /* loaded from: input_file:org/mule/runtime/core/processor/strategy/ProactorProcessingStrategyFactory$ProactorProcessingStrategy.class */
    static class ProactorProcessingStrategy extends MultiReactorProcessingStrategyFactory.MultiReactorProcessingStrategy {
        private Supplier<Scheduler> blockingSchedulerSupplier;
        private Supplier<Scheduler> cpuIntensiveSchedulerSupplier;
        private Scheduler blockingScheduler;
        private Scheduler cpuIntensiveScheduler;

        public ProactorProcessingStrategy(Supplier<Scheduler> supplier, Supplier<Scheduler> supplier2, Supplier<Scheduler> supplier3, Consumer<Scheduler> consumer, MuleContext muleContext) {
            super(supplier, consumer, muleContext);
            this.blockingSchedulerSupplier = supplier2;
            this.cpuIntensiveSchedulerSupplier = supplier3;
        }

        @Override // org.mule.runtime.core.processor.strategy.MultiReactorProcessingStrategyFactory.MultiReactorProcessingStrategy
        public void start() throws MuleException {
            super.start();
            this.blockingScheduler = this.blockingSchedulerSupplier.get();
            this.cpuIntensiveScheduler = this.cpuIntensiveSchedulerSupplier.get();
        }

        @Override // org.mule.runtime.core.processor.strategy.MultiReactorProcessingStrategyFactory.MultiReactorProcessingStrategy
        public void stop() throws MuleException {
            if (this.blockingScheduler != null) {
                getSchedulerStopper().accept(this.blockingScheduler);
            }
            if (this.cpuIntensiveScheduler != null) {
                getSchedulerStopper().accept(this.cpuIntensiveScheduler);
            }
            super.stop();
        }

        @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
        public Function<Publisher<Event>, Publisher<Event>> onProcessor(Processor processor, Function<Publisher<Event>, Publisher<Event>> function) {
            return processor.getProccesingType() == ReactiveProcessor.ProcessingType.BLOCKING ? proactor(function, this.blockingScheduler) : processor.getProccesingType() == ReactiveProcessor.ProcessingType.CPU_INTENSIVE ? proactor(function, this.cpuIntensiveScheduler) : publisher -> {
                return Flux.from(publisher).transform(function);
            };
        }

        private Function<Publisher<Event>, Publisher<Event>> proactor(Function<Publisher<Event>, Publisher<Event>> function, Scheduler scheduler) {
            return publisher -> {
                return Flux.from(publisher).publishOn(createReactorScheduler(scheduler)).transform(function).publishOn(createReactorScheduler(this.cpuLightScheduler));
            };
        }
    }

    @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategyFactory
    public ProcessingStrategy create(MuleContext muleContext) {
        return new ProactorProcessingStrategy(() -> {
            return muleContext.getSchedulerService().cpuLightScheduler();
        }, () -> {
            return muleContext.getSchedulerService().ioScheduler();
        }, () -> {
            return muleContext.getSchedulerService().cpuIntensiveScheduler();
        }, scheduler -> {
            scheduler.stop(muleContext.getConfiguration().getShutdownTimeout(), TimeUnit.MILLISECONDS);
        }, muleContext);
    }
}
