package org.mule.runtime.core.internal.processor.strategy;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import java.util.concurrent.TimeUnit;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.transaction.Transaction;
import org.mule.runtime.core.api.transaction.TransactionCoordination;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:org/mule/runtime/core/internal/processor/strategy/AbstractCachedThreadReactorSinkProvider.class */
public abstract class AbstractCachedThreadReactorSinkProvider implements ReactorSinkProvider {
    private static final int THREAD_CACHE_TIME_LIMIT_IN_MINUTES = 60;
    private static final int TRANSACTION_CACHE_TIME_LIMIT_IN_MINUTES = 10;
    private final Cache<Thread, FluxSink<CoreEvent>> sinks = Caffeine.newBuilder().weakKeys().removalListener((thread, fluxSink, removalCause) -> {
        fluxSink.complete();
    }).expireAfterAccess(60, TimeUnit.MINUTES).build();
    private final Cache<Transaction, FluxSink<CoreEvent>> sinksNestedTx = Caffeine.newBuilder().weakKeys().removalListener((transaction, fluxSink, removalCause) -> {
        fluxSink.complete();
    }).expireAfterAccess(10, TimeUnit.MINUTES).build();

    @Override // org.mule.runtime.api.lifecycle.Disposable
    public void dispose() {
        this.sinks.asMap().values().forEach((v0) -> {
            v0.complete();
        });
        this.sinksNestedTx.asMap().values().forEach((v0) -> {
            v0.complete();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void invalidateAll() {
        this.sinks.invalidateAll();
        this.sinksNestedTx.invalidateAll();
    }

    @Override // org.mule.runtime.core.internal.processor.strategy.ReactorSinkProvider
    public FluxSink<CoreEvent> getSink() {
        TransactionCoordination transactionCoordination = TransactionCoordination.getInstance();
        return transactionCoordination.runningNestedTransaction() ? this.sinksNestedTx.get(transactionCoordination.getTransaction(), transaction -> {
            return createSink();
        }) : this.sinks.get(Thread.currentThread(), thread -> {
            return createSink();
        });
    }

    protected abstract FluxSink<CoreEvent> createSink();
}
