package org.mule.runtime.core.internal.processor.strategy;

import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Consumer;
import java.util.function.Function;
import javax.inject.Inject;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.config.FeatureFlaggingService;
import org.mule.runtime.api.profiling.ProfilingDataProducer;
import org.mule.runtime.api.profiling.type.ProfilingEventType;
import org.mule.runtime.api.profiling.type.RuntimeProfilingEventTypes;
import org.mule.runtime.api.profiling.type.context.ComponentProcessingStrategyProfilingEventContext;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import org.mule.runtime.core.internal.processor.strategy.reactor.builder.ReactorPublisherBuilder;
import org.mule.runtime.core.internal.processor.strategy.util.ProfilingUtils;
import org.mule.runtime.core.internal.profiling.InternalProfilingService;
import org.mule.runtime.core.internal.profiling.context.DefaultComponentProcessingStrategyProfilingEventContext;
import org.mule.runtime.core.internal.profiling.tracing.DefaultComponentMetadata;
import org.mule.runtime.core.internal.profiling.tracing.DefaultExecutionContext;
import org.mule.runtime.core.internal.util.rx.ConditionalExecutorServiceDecorator;
import org.mule.runtime.core.internal.util.rx.ReactorTransactionUtils;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/TransactionAwareStreamEmitterProcessingStrategyDecorator.class */
public class TransactionAwareStreamEmitterProcessingStrategyDecorator extends ProcessingStrategyDecorator {
    private static final Consumer<CoreEvent> NULL_EVENT_CONSUMER = coreEvent -> {
    };

    @Inject
    private InternalProfilingService profilingService;

    @Inject
    private MuleContext muleContext;

    @Inject
    FeatureFlaggingService featureFlaggingService;

    public TransactionAwareStreamEmitterProcessingStrategyDecorator(ProcessingStrategy processingStrategy) {
        super(processingStrategy);
        if (processingStrategy instanceof ProcessingStrategyAdapter) {
            ProcessingStrategyAdapter processingStrategyAdapter = (ProcessingStrategyAdapter) processingStrategy;
            processingStrategyAdapter.setOnEventConsumer(NULL_EVENT_CONSUMER);
            Function<ScheduledExecutorService, ScheduledExecutorService> schedulerDecorator = processingStrategyAdapter.getSchedulerDecorator();
            processingStrategyAdapter.setSchedulerDecorator(scheduledExecutorService -> {
                return new ConditionalExecutorServiceDecorator((ScheduledExecutorService) schedulerDecorator.apply(scheduledExecutorService), scheduledExecutorService -> {
                    return TransactionCoordination.isTransactionActive();
                });
            });
        }
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyDecorator, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public Sink createSink(FlowConstruct flowConstruct, ReactiveProcessor reactiveProcessor) {
        return new TransactionalDelegateSink(new ReactorSinkProviderBasedSink(new DefaultCachedThreadReactorSinkProvider(flowConstruct, publisher -> {
            return Flux.from(publisher).contextWrite(ReactorTransactionUtils.popTxFromSubscriberContext()).transform(reactiveProcessor).contextWrite(ReactorTransactionUtils.pushTxToSubscriberContext("source"));
        }, NULL_EVENT_CONSUMER, this.featureFlaggingService)), this.delegate.createSink(flowConstruct, reactiveProcessor));
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyDecorator, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public ReactiveProcessor onPipeline(ReactiveProcessor reactiveProcessor) {
        ComponentLocation location = ProfilingUtils.getLocation(reactiveProcessor);
        String artifactId = ProfilingUtils.getArtifactId(this.muleContext);
        String artifactType = ProfilingUtils.getArtifactType(this.muleContext);
        Function function = coreEvent -> {
            return new DefaultComponentProcessingStrategyProfilingEventContext(coreEvent, location, Thread.currentThread().getName(), artifactId, artifactType, System.currentTimeMillis());
        };
        return publisher -> {
            return Flux.deferContextual(contextView -> {
                return ReactorTransactionUtils.isTxActiveByContext(contextView) ? ReactorPublisherBuilder.buildFlux(publisher).setTracingContext(this.profilingService, coreEvent2 -> {
                    return new DefaultExecutionContext(new DefaultComponentMetadata(coreEvent2.getCorrelationId(), artifactId, artifactType, location));
                }).profileProcessingStrategyEvent(this.profilingService, getDataProducer(RuntimeProfilingEventTypes.PS_SCHEDULING_FLOW_EXECUTION), function).profileProcessingStrategyEvent(this.profilingService, getDataProducer(RuntimeProfilingEventTypes.STARTING_FLOW_EXECUTION), function).transform(BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onPipeline(reactiveProcessor)).profileProcessingStrategyEvent(this.profilingService, getDataProducer(RuntimeProfilingEventTypes.FLOW_EXECUTED), function).build() : Flux.from(publisher).transform(this.delegate.onPipeline(reactiveProcessor));
            });
        };
    }

    private ProfilingDataProducer<ComponentProcessingStrategyProfilingEventContext, CoreEvent> getDataProducer(ProfilingEventType<ComponentProcessingStrategyProfilingEventContext> profilingEventType) {
        return this.profilingService.getProfilingDataProducer(profilingEventType);
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ProcessingStrategyDecorator, org.mule.runtime.core.api.processor.strategy.ProcessingStrategy
    public ReactiveProcessor onProcessor(ReactiveProcessor reactiveProcessor) {
        ComponentLocation location = ProfilingUtils.getLocation(reactiveProcessor);
        String id = this.muleContext.getConfiguration().getId();
        String asString = this.muleContext.getArtifactType().getAsString();
        Function function = coreEvent -> {
            return new DefaultComponentProcessingStrategyProfilingEventContext(coreEvent, location, Thread.currentThread().getName(), id, asString, System.currentTimeMillis());
        };
        return publisher -> {
            return Flux.deferContextual(contextView -> {
                return ReactorTransactionUtils.isTxActiveByContext(contextView) ? ReactorPublisherBuilder.buildFlux(publisher).setTracingContext(this.profilingService, coreEvent2 -> {
                    return new DefaultExecutionContext(new DefaultComponentMetadata(coreEvent2.getCorrelationId(), id, asString, location));
                }).profileProcessingStrategyEvent(this.profilingService, getDataProducer(RuntimeProfilingEventTypes.PS_SCHEDULING_OPERATION_EXECUTION), function).profileProcessingStrategyEvent(this.profilingService, getDataProducer(RuntimeProfilingEventTypes.PS_STARTING_OPERATION_EXECUTION), function).transform(BlockingProcessingStrategyFactory.BLOCKING_PROCESSING_STRATEGY_INSTANCE.onProcessor(reactiveProcessor)).profileProcessingStrategyEvent(this.profilingService, getDataProducer(RuntimeProfilingEventTypes.PS_OPERATION_EXECUTED), function).profileProcessingStrategyEvent(this.profilingService, getDataProducer(RuntimeProfilingEventTypes.PS_FLOW_MESSAGE_PASSING), function).build() : Flux.from(publisher).transform(this.delegate.onProcessor(reactiveProcessor));
            });
        };
    }
}
