package org.mule.runtime.core.internal.processor.strategy;

import java.time.Duration;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.construct.BackPressureReason;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.internal.construct.FromFlowRejectedExecutionException;
import org.mule.runtime.core.internal.processor.chain.InterceptedReactiveProcessor;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy;
import org.mule.runtime.core.internal.util.rx.RejectionCallbackExecutorServiceDecorator;
import org.mule.runtime.core.internal.util.rx.RetrySchedulerWrapper;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/ProactorStreamProcessingStrategy.class */
public abstract class ProactorStreamProcessingStrategy extends AbstractReactorStreamProcessingStrategy {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) ProactorStreamProcessingStrategy.class);
    private static final long SCHEDULER_BUSY_RETRY_INTERVAL_NS = TimeUnit.MILLISECONDS.toNanos(2);
    private static Class<ClassLoader> sdkOperationClass;
    private final Supplier<Scheduler> blockingSchedulerSupplier;
    private final Supplier<Scheduler> cpuIntensiveSchedulerSupplier;
    private Scheduler blockingScheduler;
    private Scheduler cpuIntensiveScheduler;
    protected final AtomicLong lastRetryTimestamp;
    private final AtomicInteger inFlightEvents;
    private final AtomicInteger queuedEvents;
    private final BiConsumer<CoreEvent, Throwable> IN_FLIGHT_DECREMENT_CALLBACK;
    private final BiConsumer<CoreEvent, Throwable> QUEUED_DECREMENT_CALLBACK;
    private final LongUnaryOperator LAST_RETRY_TIMESTAMP_CHECK_OPERATOR;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/ProactorStreamProcessingStrategy$ProactorSinkWrapper.class */
    public final class ProactorSinkWrapper<E> implements AbstractProcessingStrategy.ReactorSink<E> {
        private final AbstractProcessingStrategy.ReactorSink<E> innerSink;

        /* JADX INFO: Access modifiers changed from: protected */
        public ProactorSinkWrapper(AbstractProcessingStrategy.ReactorSink<E> reactorSink) {
            this.innerSink = reactorSink;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.mule.runtime.core.api.processor.Sink, java.util.function.Consumer
        public final void accept(CoreEvent coreEvent) {
            this.innerSink.accept(coreEvent);
        }

        @Override // org.mule.runtime.core.api.processor.Sink
        public final BackPressureReason emit(CoreEvent coreEvent) {
            return this.innerSink.emit(coreEvent);
        }

        @Override // org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy.ReactorSink
        public E intoSink(CoreEvent coreEvent) {
            return this.innerSink.intoSink(coreEvent);
        }

        @Override // org.mule.runtime.api.lifecycle.Disposable
        public final void dispose() {
            this.innerSink.dispose();
        }
    }

    public ProactorStreamProcessingStrategy(int i, Supplier<Scheduler> supplier, Supplier<Scheduler> supplier2, Supplier<Scheduler> supplier3, int i2, int i3, boolean z) {
        super(i, supplier, i2, i3, z);
        this.lastRetryTimestamp = new AtomicLong(Long.MIN_VALUE);
        this.inFlightEvents = new AtomicInteger();
        this.queuedEvents = new AtomicInteger();
        this.IN_FLIGHT_DECREMENT_CALLBACK = (coreEvent, th) -> {
            this.inFlightEvents.decrementAndGet();
        };
        this.QUEUED_DECREMENT_CALLBACK = (coreEvent2, th2) -> {
            this.queuedEvents.decrementAndGet();
        };
        this.LAST_RETRY_TIMESTAMP_CHECK_OPERATOR = j -> {
            if (System.nanoTime() - j < SCHEDULER_BUSY_RETRY_INTERVAL_NS * 2) {
                return j;
            }
            return Long.MIN_VALUE;
        };
        this.blockingSchedulerSupplier = supplier2;
        this.cpuIntensiveSchedulerSupplier = supplier3;
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.AbstractReactorStreamProcessingStrategy, org.mule.runtime.api.lifecycle.Startable
    public void start() throws MuleException {
        super.start();
        this.blockingScheduler = this.blockingSchedulerSupplier.get();
        this.cpuIntensiveScheduler = this.cpuIntensiveSchedulerSupplier.get();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.mule.runtime.core.internal.processor.strategy.AbstractReactorStreamProcessingStrategy
    public Scheduler createCpuLightScheduler(Supplier<Scheduler> supplier) {
        return new RetrySchedulerWrapper(super.createCpuLightScheduler(supplier), 2L);
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.AbstractReactorStreamProcessingStrategy, org.mule.runtime.api.lifecycle.Stoppable
    public void stop() throws MuleException {
        super.stop();
        if (this.blockingScheduler != null) {
            this.blockingScheduler.stop();
        }
        if (this.cpuIntensiveScheduler != null) {
            this.cpuIntensiveScheduler.stop();
        }
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.AbstractReactorStreamProcessingStrategy, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public ReactiveProcessor onProcessor(ReactiveProcessor reactiveProcessor) {
        return (reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.BLOCKING || reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.IO_RW) ? proactor(reactiveProcessor, this.blockingScheduler) : reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.CPU_INTENSIVE ? proactor(reactiveProcessor, this.cpuIntensiveScheduler) : super.onProcessor(reactiveProcessor);
    }

    protected ReactiveProcessor proactor(ReactiveProcessor reactiveProcessor, ScheduledExecutorService scheduledExecutorService) {
        LOGGER.debug("Doing proactor() for {} on {}. maxConcurrency={}, parallelism={}, subscribers={}", reactiveProcessor, scheduledExecutorService, Integer.valueOf(this.maxConcurrency), Integer.valueOf(getParallelism()), Integer.valueOf(this.subscribers));
        RejectionCallbackExecutorServiceDecorator rejectionCallbackExecutorServiceDecorator = new RejectionCallbackExecutorServiceDecorator(scheduledExecutorService, decorateScheduler(getCpuLightScheduler()), () -> {
            onRejected(scheduledExecutorService);
        }, () -> {
            this.lastRetryTimestamp.set(Long.MIN_VALUE);
        }, Duration.ofMillis(2L));
        return this.maxConcurrency == 1 ? publisher -> {
            return scheduleProcessor(reactiveProcessor, rejectionCallbackExecutorServiceDecorator, Flux.from(publisher)).subscriberContext(context -> {
                return context.put(AbstractProcessingStrategy.PROCESSOR_SCHEDULER_CONTEXT_KEY, scheduledExecutorService);
            });
        } : this.maxConcurrency == Integer.MAX_VALUE ? (reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.CPU_INTENSIVE && (reactiveProcessor instanceof InterceptedReactiveProcessor) && sdkOperationClass != null && sdkOperationClass.isAssignableFrom(((InterceptedReactiveProcessor) reactiveProcessor).getProcessor().getClass())) ? publisher2 -> {
            return scheduleProcessor(reactiveProcessor, rejectionCallbackExecutorServiceDecorator, Flux.from(publisher2)).subscriberContext(context -> {
                return context.put(AbstractProcessingStrategy.PROCESSOR_SCHEDULER_CONTEXT_KEY, scheduledExecutorService);
            });
        } : publisher3 -> {
            return Flux.from(publisher3).flatMap(coreEvent -> {
                return scheduleProcessor(reactiveProcessor, rejectionCallbackExecutorServiceDecorator, Mono.just(coreEvent)).subscriberContext(context -> {
                    return context.put(AbstractProcessingStrategy.PROCESSOR_SCHEDULER_CONTEXT_KEY, scheduledExecutorService);
                });
            }, Integer.MAX_VALUE);
        } : publisher4 -> {
            return Flux.from(publisher4).flatMap(coreEvent -> {
                return scheduleProcessor(reactiveProcessor, rejectionCallbackExecutorServiceDecorator, Mono.just(coreEvent)).subscriberContext(context -> {
                    return context.put(AbstractProcessingStrategy.PROCESSOR_SCHEDULER_CONTEXT_KEY, scheduledExecutorService);
                });
            }, Math.max(this.maxConcurrency / (getParallelism() * this.subscribers), 1));
        };
    }

    private void onRejected(ScheduledExecutorService scheduledExecutorService) {
        LOGGER.trace("Shared scheduler {} is busy. Scheduling of the current event will be retried after {}ms.", (Object) (scheduledExecutorService instanceof Scheduler ? ((Scheduler) scheduledExecutorService).getName() : scheduledExecutorService.toString()), (Object) 2L);
        this.lastRetryTimestamp.set(System.nanoTime());
    }

    protected abstract Mono<CoreEvent> scheduleProcessor(ReactiveProcessor reactiveProcessor, ScheduledExecutorService scheduledExecutorService, Mono<CoreEvent> mono);

    protected abstract Flux<CoreEvent> scheduleProcessor(ReactiveProcessor reactiveProcessor, ScheduledExecutorService scheduledExecutorService, Flux<CoreEvent> flux);

    protected Scheduler getBlockingScheduler() {
        return this.blockingScheduler;
    }

    protected Scheduler getCpuIntensiveScheduler() {
        return this.cpuIntensiveScheduler;
    }

    private BackPressureReason checkCapacity(CoreEvent coreEvent) {
        if (this.lastRetryTimestamp.get() != Long.MIN_VALUE && this.lastRetryTimestamp.updateAndGet(this.LAST_RETRY_TIMESTAMP_CHECK_OPERATOR) != Long.MIN_VALUE) {
            if (this.maxConcurrencyEagerCheck) {
                return BackPressureReason.REQUIRED_SCHEDULER_BUSY;
            }
            if (this.queuedEvents.incrementAndGet() > getBufferQueueSize()) {
                this.queuedEvents.decrementAndGet();
                return BackPressureReason.REQUIRED_SCHEDULER_BUSY_WITH_FULL_BUFFER;
            }
            ((BaseEventContext) coreEvent.getContext()).onResponse(this.QUEUED_DECREMENT_CALLBACK);
        }
        if (!this.maxConcurrencyEagerCheck) {
            return null;
        }
        if (this.inFlightEvents.incrementAndGet() > this.maxConcurrency) {
            this.inFlightEvents.decrementAndGet();
            return BackPressureReason.MAX_CONCURRENCY_EXCEEDED;
        }
        ((BaseEventContext) coreEvent.getContext()).onResponse(this.IN_FLIGHT_DECREMENT_CALLBACK);
        return null;
    }

    protected int getBufferQueueSize() {
        return AbstractStreamProcessingStrategyFactory.DEFAULT_BUFFER_SIZE;
    }

    @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public void checkBackpressureAccepting(CoreEvent coreEvent) throws RejectedExecutionException {
        BackPressureReason checkCapacity = checkCapacity(coreEvent);
        if (checkCapacity != null) {
            throw new FromFlowRejectedExecutionException(checkCapacity);
        }
    }

    @Override // org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public BackPressureReason checkBackpressureEmitting(CoreEvent coreEvent) {
        return checkCapacity(coreEvent);
    }

    static {
        try {
            sdkOperationClass = ProactorStreamProcessingStrategy.class.getClassLoader().loadClass("org.mule.runtime.module.extension.internal.runtime.operation.OperationMessageProcessor");
        } catch (ClassNotFoundException e) {
            LOGGER.debug("OperationMessageProcessor interface not available in current context", (Throwable) e);
        }
    }
}
