package org.mule.runtime.core.internal.processor.strategy.reactor.builder;

import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Function;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.profiling.ProfilingDataProducer;
import org.mule.runtime.api.profiling.type.RuntimeProfilingEventTypes;
import org.mule.runtime.api.profiling.type.context.ComponentProcessingStrategyProfilingEventContext;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.internal.processor.strategy.AbstractProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.util.ProfilingUtils;
import org.mule.runtime.core.internal.profiling.CoreProfilingService;
import org.mule.runtime.core.internal.profiling.context.DefaultComponentProcessingStrategyProfilingEventContext;
import org.mule.runtime.core.internal.profiling.tracing.DefaultComponentMetadata;
import org.mule.runtime.core.internal.profiling.tracing.DefaultExecutionContext;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/reactor/builder/ComponentProcessingStrategyReactiveProcessorBuilder.class */
public class ComponentProcessingStrategyReactiveProcessorBuilder {
    private final ReactiveProcessor processor;
    private final Scheduler contextScheduler;
    private final String artifactId;
    private final String artifactType;
    private int parallelism = 1;
    private ScheduledExecutorService dispatcherScheduler;
    private ScheduledExecutorService callbackScheduler;
    private CoreProfilingService profilingService;

    public ComponentProcessingStrategyReactiveProcessorBuilder(ReactiveProcessor reactiveProcessor, Scheduler scheduler, String str, String str2) {
        this.processor = reactiveProcessor;
        this.contextScheduler = scheduler;
        this.artifactId = str;
        this.artifactType = str2;
    }

    public static ComponentProcessingStrategyReactiveProcessorBuilder processingStrategyReactiveProcessorFrom(ReactiveProcessor reactiveProcessor, Scheduler scheduler, String str, String str2) {
        return new ComponentProcessingStrategyReactiveProcessorBuilder(reactiveProcessor, scheduler, str, str2);
    }

    public ComponentProcessingStrategyReactiveProcessorBuilder withParallelism(int i) {
        this.parallelism = i;
        return this;
    }

    public ComponentProcessingStrategyReactiveProcessorBuilder withDispatcherScheduler(ScheduledExecutorService scheduledExecutorService) {
        this.dispatcherScheduler = scheduledExecutorService;
        return this;
    }

    public ComponentProcessingStrategyReactiveProcessorBuilder withCallbackScheduler(ScheduledExecutorService scheduledExecutorService) {
        this.callbackScheduler = scheduledExecutorService;
        return this;
    }

    public ComponentProcessingStrategyReactiveProcessorBuilder withProfilingService(CoreProfilingService coreProfilingService) {
        this.profilingService = coreProfilingService;
        return this;
    }

    public ReactiveProcessor build() {
        return this.parallelism == 1 ? publisher -> {
            return baseProcessingStrategyPublisherBuilder(ReactorPublisherBuilder.buildFlux(publisher)).build();
        } : publisher2 -> {
            return Flux.from(publisher2).flatMap(coreEvent -> {
                return baseProcessingStrategyPublisherBuilder(ReactorPublisherBuilder.buildMono(coreEvent)).build();
            }, this.parallelism);
        };
    }

    private <T extends Publisher<?>> ReactorPublisherBuilder<T> baseProcessingStrategyPublisherBuilder(ReactorPublisherBuilder<T> reactorPublisherBuilder) {
        ProfilingDataProducer profilingDataProducer = this.profilingService.getProfilingDataProducer(RuntimeProfilingEventTypes.PS_SCHEDULING_OPERATION_EXECUTION);
        ProfilingDataProducer profilingDataProducer2 = this.profilingService.getProfilingDataProducer(RuntimeProfilingEventTypes.PS_STARTING_OPERATION_EXECUTION);
        ProfilingDataProducer profilingDataProducer3 = this.profilingService.getProfilingDataProducer(RuntimeProfilingEventTypes.PS_OPERATION_EXECUTED);
        ProfilingDataProducer profilingDataProducer4 = this.profilingService.getProfilingDataProducer(RuntimeProfilingEventTypes.PS_FLOW_MESSAGE_PASSING);
        ComponentLocation location = ProfilingUtils.getLocation(this.processor);
        Function<CoreEvent, ComponentProcessingStrategyProfilingEventContext> function = coreEvent -> {
            return new DefaultComponentProcessingStrategyProfilingEventContext(coreEvent, location, Thread.currentThread().getName(), this.artifactId, this.artifactType, System.currentTimeMillis());
        };
        return reactorPublisherBuilder.setTracingContext(this.profilingService, coreEvent2 -> {
            return new DefaultExecutionContext(new DefaultComponentMetadata(coreEvent2.getCorrelationId(), this.artifactId, this.artifactType, location));
        }).profileProcessingStrategyEvent(this.profilingService, profilingDataProducer, function).publishOn(Optional.ofNullable(this.dispatcherScheduler)).profileProcessingStrategyEvent(this.profilingService, profilingDataProducer2, function).transform(this.processor).profileProcessingStrategyEvent(this.profilingService, profilingDataProducer3, function).publishOn(Optional.ofNullable(this.callbackScheduler)).profileProcessingStrategyEvent(this.profilingService, profilingDataProducer4, function).subscriberContext(context -> {
            return context.put(AbstractProcessingStrategy.PROCESSOR_SCHEDULER_CONTEXT_KEY, this.contextScheduler);
        });
    }
}
