/*
 * Decompiled with CFR 0.152.
 */
package org.mule.runtime.core.processor.strategy;

import java.util.function.Function;
import org.mule.AbstractBenchmark;
import org.mule.runtime.api.artifact.ArtifactType;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.Flow;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.lifecycle.LifecycleUtils;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.api.processor.strategy.ProcessingStrategy;
import org.mule.runtime.core.internal.processor.strategy.DirectProcessingStrategyFactory;
import org.mule.runtime.core.internal.processor.strategy.TransactionAwareStreamEmitterProcessingStrategyFactory;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.infra.Blackhole;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoSink;

@State(value=Scope.Benchmark)
@BenchmarkMode(value={Mode.Throughput})
public class ProcessingStrategyBenchmark
extends AbstractBenchmark {
    private MuleContext muleContext;
    private ProcessingStrategy directPs;
    private ProcessingStrategy emitterPs;
    private Flow flow;
    private Sink directSink;
    private Sink emitterSink;
    private FluxSink<CoreEvent> directPipeline;
    private FluxSink<CoreEvent> emitterPipeline;
    private FluxSink<CoreEvent> directProcessor;
    private FluxSink<CoreEvent> emitterProcessor;
    private Sink directAllSink;
    private Sink emitterAllSink;

    @Setup(value=Level.Trial)
    public void setUp() throws MuleException {
        this.muleContext = this.createMuleContextWithServices();
        this.directPs = new DirectProcessingStrategyFactory().create(null, null, this.muleContext.getSchedulerService(), () -> ((MuleContext)this.muleContext).getSchedulerBaseConfig(), "direct_mb", "direct_mb", ArtifactType.APP, () -> this.muleContext.getConfiguration().getShutdownTimeout());
        LifecycleUtils.initialiseIfNeeded((Object)this.directPs);
        LifecycleUtils.startIfNeeded((Object)this.directPs);
        this.emitterPs = new TransactionAwareStreamEmitterProcessingStrategyFactory().create(null, null, this.muleContext.getSchedulerService(), () -> ((MuleContext)this.muleContext).getSchedulerBaseConfig(), "emitter_mb", "emitter_mb", ArtifactType.APP, () -> this.muleContext.getConfiguration().getShutdownTimeout());
        LifecycleUtils.initialiseIfNeeded((Object)this.emitterPs);
        LifecycleUtils.startIfNeeded((Object)this.emitterPs);
        this.flow = this.createFlow(this.muleContext);
        ReactiveProcessor processor = p -> Flux.from((Publisher)p).doOnNext(e -> Blackhole.consumeCPU((long)100L));
        this.directSink = this.directPs.createSink((FlowConstruct)this.flow, publisher -> this.baseFlux((Publisher<CoreEvent>)publisher, (Function<? super Flux<CoreEvent>, ? extends Publisher<CoreEvent>>)processor));
        this.emitterSink = this.emitterPs.createSink((FlowConstruct)this.flow, publisher -> this.baseFlux((Publisher<CoreEvent>)publisher, (Function<? super Flux<CoreEvent>, ? extends Publisher<CoreEvent>>)processor));
        Flux.create(s -> {
            this.directPipeline = s;
        }, (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.ERROR).transform((Function)this.directPs.onPipeline(publisher -> this.baseFlux((Publisher<CoreEvent>)publisher, (Function<? super Flux<CoreEvent>, ? extends Publisher<CoreEvent>>)processor))).subscribe();
        Flux.create(s -> {
            this.emitterPipeline = s;
        }, (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.ERROR).transform((Function)this.emitterPs.onPipeline(publisher -> this.baseFlux((Publisher<CoreEvent>)publisher, (Function<? super Flux<CoreEvent>, ? extends Publisher<CoreEvent>>)processor))).subscribe();
        Flux.create(s -> {
            this.directProcessor = s;
        }, (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.ERROR).transform((Function)this.directPs.onProcessor(publisher -> this.baseFlux((Publisher<CoreEvent>)publisher, (Function<? super Flux<CoreEvent>, ? extends Publisher<CoreEvent>>)processor))).subscribe();
        Flux.create(s -> {
            this.emitterProcessor = s;
        }, (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.ERROR).transform((Function)this.emitterPs.onProcessor(publisher -> this.baseFlux((Publisher<CoreEvent>)publisher, (Function<? super Flux<CoreEvent>, ? extends Publisher<CoreEvent>>)processor))).subscribe();
        this.directAllSink = this.directPs.createSink((FlowConstruct)this.flow, publisher -> this.baseFlux((Publisher<CoreEvent>)publisher, (Function<? super Flux<CoreEvent>, ? extends Publisher<CoreEvent>>)this.directPs.onPipeline(this.directPs.onProcessor(processor))));
        this.emitterAllSink = this.emitterPs.createSink((FlowConstruct)this.flow, publisher -> this.baseFlux((Publisher<CoreEvent>)publisher, (Function<? super Flux<CoreEvent>, ? extends Publisher<CoreEvent>>)this.emitterPs.onPipeline(this.emitterPs.onProcessor(processor))));
    }

    private Flux<CoreEvent> baseFlux(Publisher<CoreEvent> publisher, Function<? super Flux<CoreEvent>, ? extends Publisher<CoreEvent>> transformFunction) {
        return Flux.from(publisher).transform(transformFunction).doOnNext(event -> ((MonoSink)event.getMessage().getPayload().getValue()).success(event)).onErrorContinue((t, event) -> ((MonoSink)((CoreEvent)event).getMessage().getPayload().getValue()).error(t));
    }

    @Benchmark
    @Threads(value=-1)
    public CoreEvent directSink() {
        return (CoreEvent)Mono.create(resultSink -> this.directSink.accept(this.createEvent(this.flow, resultSink))).block();
    }

    @Benchmark
    @Threads(value=-1)
    public CoreEvent emitterSink() {
        return (CoreEvent)Mono.create(resultSink -> this.emitterSink.accept(this.createEvent(this.flow, resultSink))).block();
    }

    @Benchmark
    @Threads(value=1)
    public CoreEvent directPipeline() {
        return (CoreEvent)Mono.create(resultSink -> this.directPipeline.next((Object)this.createEvent(this.flow, resultSink))).block();
    }

    @Benchmark
    @Threads(value=1)
    public CoreEvent emitterPipeline() {
        return (CoreEvent)Mono.create(resultSink -> this.emitterPipeline.next((Object)this.createEvent(this.flow, resultSink))).block();
    }

    @Benchmark
    @Threads(value=1)
    public CoreEvent directProcessor() {
        return (CoreEvent)Mono.create(resultSink -> this.directProcessor.next((Object)this.createEvent(this.flow, resultSink))).block();
    }

    @Benchmark
    @Threads(value=1)
    public CoreEvent emitterProcessor() {
        return (CoreEvent)Mono.create(resultSink -> this.emitterProcessor.next((Object)this.createEvent(this.flow, resultSink))).block();
    }

    @Benchmark
    @Threads(value=-1)
    public CoreEvent directAllSink() {
        return (CoreEvent)Mono.create(resultSink -> this.directAllSink.accept(this.createEvent(this.flow, resultSink))).block();
    }

    @Benchmark
    @Threads(value=-1)
    public CoreEvent emitterAllSink() {
        return (CoreEvent)Mono.create(resultSink -> this.emitterAllSink.accept(this.createEvent(this.flow, resultSink))).block();
    }
}

