package org.mule.runtime.core.internal.processor.strategy;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.function.Consumer;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.util.MuleSystemProperties;
import org.mule.runtime.core.api.construct.BackPressureReason;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Sink;
import org.mule.runtime.core.internal.rx.FluxSinkRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/StreamPerThreadSink.class */
public class StreamPerThreadSink implements Sink, Disposable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) StreamPerThreadSink.class);
    private final ReactiveProcessor processor;
    private final Consumer<CoreEvent> eventConsumer;
    private final FlowConstruct flowConstruct;
    private volatile boolean disposing = false;
    private final Cache<Thread, FluxSink<CoreEvent>> sinks = Caffeine.newBuilder().weakKeys().build();

    public StreamPerThreadSink(ReactiveProcessor reactiveProcessor, Consumer<CoreEvent> consumer, FlowConstruct flowConstruct) {
        this.processor = reactiveProcessor;
        this.eventConsumer = consumer;
        this.flowConstruct = flowConstruct;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.mule.runtime.core.api.processor.Sink, java.util.function.Consumer
    public void accept(CoreEvent coreEvent) {
        if (this.disposing) {
            throw new IllegalStateException("Already disposed");
        }
        Thread currentThread = Thread.currentThread();
        this.sinks.get(currentThread, thread -> {
            FluxSinkRecorder fluxSinkRecorder = new FluxSinkRecorder();
            fluxSinkRecorder.flux().doOnNext(coreEvent2 -> {
                this.eventConsumer.accept(coreEvent2);
            }).transform(this.processor).subscribe((Consumer) null, th -> {
                LOGGER.error("Exception reached PS subscriber for flow '" + this.flowConstruct.getName() + "'", th);
                this.sinks.invalidate(currentThread);
            }, () -> {
                this.sinks.invalidate(currentThread);
            });
            return fluxSinkRecorder.getFluxSink();
        }).next(coreEvent);
    }

    @Override // org.mule.runtime.core.api.processor.Sink
    public BackPressureReason emit(CoreEvent coreEvent) {
        accept(coreEvent);
        return null;
    }

    @Override // org.mule.runtime.api.lifecycle.Disposable
    public void dispose() {
        this.disposing = true;
        this.sinks.asMap().values().forEach(fluxSink -> {
            fluxSink.complete();
        });
        long shutdownTimeout = this.flowConstruct.getMuleContext().getConfiguration().getShutdownTimeout();
        long currentTimeMillis = System.currentTimeMillis();
        while (!this.sinks.asMap().isEmpty() && System.currentTimeMillis() <= shutdownTimeout + currentTimeMillis && !Thread.currentThread().isInterrupted()) {
            Thread.yield();
        }
        if (Thread.currentThread().isInterrupted()) {
            if (System.getProperty(MuleSystemProperties.MULE_LIFECYCLE_FAIL_ON_FIRST_DISPOSE_ERROR) != null) {
                throw new IllegalStateException(String.format("TX Subscribers of ProcessingStrategy for flow '%s' not completed before thread interruption", this.flowConstruct.getName()));
            }
            LOGGER.warn("TX Subscribers of ProcessingStrategy for flow '{}' not completed before thread interruption", this.flowConstruct.getName());
            this.sinks.invalidateAll();
            return;
        }
        if (this.sinks.asMap().isEmpty()) {
            return;
        }
        if (System.getProperty(MuleSystemProperties.MULE_LIFECYCLE_FAIL_ON_FIRST_DISPOSE_ERROR) != null) {
            throw new IllegalStateException(String.format("TX Subscribers of ProcessingStrategy for flow '%s' not completed in %d ms", this.flowConstruct.getName(), Long.valueOf(shutdownTimeout)));
        }
        LOGGER.warn("TX Subscribers of ProcessingStrategy for flow '{}' not completed in {} ms", this.flowConstruct.getName(), Long.valueOf(shutdownTimeout));
        this.sinks.invalidateAll();
    }
}
