/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.internal.processor.strategy;

import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.processor.strategy.BlockingProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.TransactionalDelegateSink;
import org.mule.runtime.core.internal.processor.strategy.WorkQueueStreamProcessingStrategyFactory;
import org.mule.runtime.core.internal.util.rx.ConditionalExecutorServiceDecorator;

public class TransactionAwareWorkQueueStreamProcessingStrategyFactory
extends WorkQueueStreamProcessingStrategyFactory {
    @Override
    public ProcessingStrategy create(MuleContext muleContext, String schedulersNamePrefix) {
        return new TransactionAwareWorkQueueStreamProcessingStrategy(() -> muleContext.getSchedulerService().customScheduler(muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + RING_BUFFER_SCHEDULER_NAME_SUFFIX).withMaxConcurrentTasks(this.getSubscriberCount() + 1).withWaitAllowed(true)), this.getBufferSize(), this.getSubscriberCount(), this.getWaitStrategy(), () -> muleContext.getSchedulerService().ioScheduler(muleContext.getSchedulerBaseConfig().withName(schedulersNamePrefix + "." + ReactiveProcessor.ProcessingType.BLOCKING.name())), this.getMaxConcurrency());
    }

    @Override
    public Class<? extends ProcessingStrategy> getProcessingStrategyType() {
        return WorkQueueStreamProcessingStrategyFactory.WorkQueueStreamProcessingStrategy.class;
    }

    static class TransactionAwareWorkQueueStreamProcessingStrategy
    extends WorkQueueStreamProcessingStrategyFactory.WorkQueueStreamProcessingStrategy {
        protected TransactionAwareWorkQueueStreamProcessingStrategy(Supplier<Scheduler> ringBufferSchedulerSupplier, int bufferSize, int subscribers, String waitStrategy, Supplier<Scheduler> blockingSchedulerSupplier, int maxConcurrency) {
            super(ringBufferSchedulerSupplier, bufferSize, subscribers, waitStrategy, blockingSchedulerSupplier, maxConcurrency);
        }

        @Override
        public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor pipeline) {
            Sink workQueueSink = super.createSink(flowConstruct, pipeline);
            Sink syncSink = BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.createSink(flowConstruct, pipeline);
            return new TransactionalDelegateSink(syncSink, workQueueSink);
        }

        @Override
        protected Consumer<CoreEvent> createOnEventConsumer() {
            return event -> {};
        }

        @Override
        protected ExecutorService decorateScheduler(Scheduler scheduler) {
            return new ConditionalExecutorServiceDecorator(scheduler, currentScheduler -> TransactionCoordination.isTransactionActive());
        }

        @Override
        public ReactiveProcessor onPipeline(ReactiveProcessor pipeline) {
            return TransactionCoordination.isTransactionActive() ? BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onPipeline(pipeline) : super.onPipeline(pipeline);
        }

        @Override
        public ReactiveProcessor onProcessor(ReactiveProcessor processor) {
            return TransactionCoordination.isTransactionActive() ? BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onProcessor(processor) : super.onProcessor(processor);
        }
    }
}

