package org.mule.runtime.core.internal.processor.strategy;

import java.time.Duration;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.LongUnaryOperator;
import java.util.function.Supplier;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.util.DataUnit;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy;
import org.mule.runtime.core.internal.util.rx.RetrySchedulerWrapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
import reactor.retry.BackoffDelay;
import reactor.retry.Retry;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/ProactorStreamProcessingStrategy.class */
public abstract class ProactorStreamProcessingStrategy extends AbstractReactorStreamProcessingStrategy {
    protected static final int STREAM_PAYLOAD_BLOCKING_IO_THRESHOLD = Integer.getInteger(AbstractStreamProcessingStrategyFactory.SYSTEM_PROPERTY_PREFIX + "STREAM_PAYLOAD_BLOCKING_IO_THRESHOLD", DataUnit.KB.toBytes(16)).intValue();
    private static final Logger LOGGER = LoggerFactory.getLogger(ProactorStreamProcessingStrategy.class);
    private static final long SCHEDULER_BUSY_RETRY_INTERVAL_NS = TimeUnit.MILLISECONDS.toNanos(2);
    private final Supplier<Scheduler> blockingSchedulerSupplier;
    private final Supplier<Scheduler> cpuIntensiveSchedulerSupplier;
    private Scheduler blockingScheduler;
    private Scheduler cpuIntensiveScheduler;
    private final AtomicLong lastRetryTimestamp;
    private boolean policyMode;
    private final AtomicInteger inFlightEvents;
    private final BiConsumer<CoreEvent, Throwable> IN_FLIGHT_DECREMENT_CALLBACK;
    private final LongUnaryOperator LAST_RETRY_TIMESTAMP_CHECK_OPERATOR;

    /* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/ProactorStreamProcessingStrategy$ProactorSinkWrapper.class */
    protected final class ProactorSinkWrapper<E> implements AbstractProcessingStrategy.ReactorSink<E> {
        private final AbstractProcessingStrategy.ReactorSink<E> innerSink;

        protected ProactorSinkWrapper(AbstractProcessingStrategy.ReactorSink<E> reactorSink) {
            this.innerSink = reactorSink;
        }

        public final void accept(CoreEvent coreEvent) {
            if (!checkCapacity(coreEvent)) {
                throw new RejectedExecutionException();
            }
            this.innerSink.accept(coreEvent);
        }

        public final boolean emit(CoreEvent coreEvent) {
            return checkCapacity(coreEvent) && this.innerSink.emit(coreEvent);
        }

        private boolean checkCapacity(CoreEvent coreEvent) {
            if (ProactorStreamProcessingStrategy.this.lastRetryTimestamp.get() != Long.MIN_VALUE && ProactorStreamProcessingStrategy.this.lastRetryTimestamp.updateAndGet(ProactorStreamProcessingStrategy.this.LAST_RETRY_TIMESTAMP_CHECK_OPERATOR) != Long.MIN_VALUE) {
                return false;
            }
            if (!ProactorStreamProcessingStrategy.this.maxConcurrencyEagerCheck) {
                return true;
            }
            if (ProactorStreamProcessingStrategy.this.inFlightEvents.incrementAndGet() > ProactorStreamProcessingStrategy.this.maxConcurrency) {
                ProactorStreamProcessingStrategy.this.inFlightEvents.decrementAndGet();
                return false;
            }
            coreEvent.getContext().onResponse(ProactorStreamProcessingStrategy.this.IN_FLIGHT_DECREMENT_CALLBACK);
            return true;
        }

        public E intoSink(CoreEvent coreEvent) {
            return (E) this.innerSink.intoSink(coreEvent);
        }

        public final void dispose() {
            this.innerSink.dispose();
        }
    }

    public ProactorStreamProcessingStrategy(int i, Supplier<Scheduler> supplier, Supplier<Scheduler> supplier2, Supplier<Scheduler> supplier3, int i2, int i3, boolean z) {
        super(i, supplier, i2, i3, z);
        this.lastRetryTimestamp = new AtomicLong(Long.MIN_VALUE);
        this.inFlightEvents = new AtomicInteger();
        this.IN_FLIGHT_DECREMENT_CALLBACK = (coreEvent, th) -> {
            this.inFlightEvents.decrementAndGet();
        };
        this.LAST_RETRY_TIMESTAMP_CHECK_OPERATOR = j -> {
            if (System.nanoTime() - j < SCHEDULER_BUSY_RETRY_INTERVAL_NS * 2) {
                return j;
            }
            return Long.MIN_VALUE;
        };
        this.blockingSchedulerSupplier = supplier2;
        this.cpuIntensiveSchedulerSupplier = supplier3;
    }

    public void start() throws MuleException {
        super.start();
        this.blockingScheduler = this.blockingSchedulerSupplier.get();
        this.cpuIntensiveScheduler = this.cpuIntensiveSchedulerSupplier.get();
    }

    protected Scheduler createCpuLightScheduler(Supplier<Scheduler> supplier) {
        return new RetrySchedulerWrapper(super.createCpuLightScheduler(supplier), 2L, () -> {
            this.lastRetryTimestamp.set(System.nanoTime());
        });
    }

    public void stop() throws MuleException {
        super.stop();
        if (this.blockingScheduler != null) {
            this.blockingScheduler.stop();
        }
        if (this.cpuIntensiveScheduler != null) {
            this.cpuIntensiveScheduler.stop();
        }
    }

    public ReactiveProcessor onProcessor(ReactiveProcessor reactiveProcessor) {
        return (reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.BLOCKING || reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.IO_RW) ? proactor(reactiveProcessor, this.blockingScheduler) : reactiveProcessor.getProcessingType() == ReactiveProcessor.ProcessingType.CPU_INTENSIVE ? proactor(reactiveProcessor, this.cpuIntensiveScheduler) : super.onProcessor(reactiveProcessor);
    }

    private ReactiveProcessor proactor(ReactiveProcessor reactiveProcessor, Scheduler scheduler) {
        return this.policyMode ? publisher -> {
            return withRetry(scheduleProcessor(reactiveProcessor, scheduler, Flux.from(publisher)).subscriberContext(context -> {
                return context.put("mule.nb.processorScheduler", scheduler);
            }), scheduler);
        } : publisher2 -> {
            return Flux.from(publisher2).flatMap(coreEvent -> {
                return withRetry(scheduleProcessor(reactiveProcessor, scheduler, Flux.just(coreEvent)).subscriberContext(context -> {
                    return context.put("mule.nb.processorScheduler", scheduler);
                }), scheduler);
            }, Math.max(this.maxConcurrency / (getParallelism() * this.subscribers), 1));
        };
    }

    public void setPolicyMode(boolean z) {
        this.policyMode = z;
    }

    protected abstract Flux<CoreEvent> scheduleProcessor(ReactiveProcessor reactiveProcessor, Scheduler scheduler, Flux<CoreEvent> flux);

    private Flux<CoreEvent> withRetry(Flux<CoreEvent> flux, Scheduler scheduler) {
        return flux.retryWhen(Retry.onlyIf(retryContext -> {
            boolean isSchedulerBusy = isSchedulerBusy(retryContext.exception());
            if (isSchedulerBusy) {
                LOGGER.trace("Shared scheduler {} is busy. Scheduling of the current event will be retried after {}ms.", scheduler.getName(), 2L);
                this.lastRetryTimestamp.set(System.nanoTime());
            }
            return isSchedulerBusy;
        }).backoff(context -> {
            return new BackoffDelay(Duration.ofMillis(2L));
        }).withBackoffScheduler(Schedulers.fromExecutorService(decorateScheduler(getCpuLightScheduler()))));
    }

    protected Scheduler getBlockingScheduler() {
        return this.blockingScheduler;
    }

    protected Scheduler getCpuIntensiveScheduler() {
        return this.cpuIntensiveScheduler;
    }
}
