package io.smallrye.reactive.messaging.pulsar.transactions;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.mutiny.groups.UniSubscribe;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage;
import io.smallrye.reactive.messaging.providers.extension.MutinyEmitterImpl;
import io.smallrye.reactive.messaging.pulsar.PulsarClientService;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
import java.time.Duration;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TransactionBuilder;
import org.eclipse.microprofile.reactive.messaging.Message;

/* loaded from: input_file:io/smallrye/reactive/messaging/pulsar/transactions/PulsarTransactionsImpl.class */
public class PulsarTransactionsImpl<T> extends MutinyEmitterImpl<T> implements PulsarTransactions<T> {
    private final PulsarClient pulsarClient;
    private final AtomicInteger txnCount;
    private final PulsarClientService pulsarClientService;
    private static final Uni<Void> VOID_UNI = Uni.createFrom().voidItem();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/smallrye/reactive/messaging/pulsar/transactions/PulsarTransactionsImpl$PulsarTransactionEmitter.class */
    public class PulsarTransactionEmitter<R> implements TransactionalEmitter<T> {
        private final Function<Transaction, Uni<Void>> beforeCommit;
        private final Function<R, Uni<R>> afterCommit;
        private final BiFunction<Transaction, Throwable, Uni<Void>> beforeAbort;
        private final Function<Throwable, Uni<R>> afterAbort;
        private final Duration txnTimeout;
        private volatile boolean abort;
        private volatile Transaction currentTransaction;
        private final Set<String> producerChannels;

        public PulsarTransactionEmitter(PulsarTransactionsImpl pulsarTransactionsImpl) {
            this(null, transaction -> {
                return PulsarTransactionsImpl.VOID_UNI;
            }, obj -> {
                return PulsarTransactionsImpl.defaultAfterCommit(obj);
            }, (transaction2, th) -> {
                return PulsarTransactionsImpl.VOID_UNI;
            }, th2 -> {
                return PulsarTransactionsImpl.defaultAfterAbort(th2);
            });
        }

        public PulsarTransactionEmitter(PulsarTransactionsImpl pulsarTransactionsImpl, Duration duration) {
            this(duration, transaction -> {
                return PulsarTransactionsImpl.VOID_UNI;
            }, obj -> {
                return PulsarTransactionsImpl.defaultAfterCommit(obj);
            }, (transaction2, th) -> {
                return PulsarTransactionsImpl.VOID_UNI;
            }, th2 -> {
                return PulsarTransactionsImpl.defaultAfterAbort(th2);
            });
        }

        public PulsarTransactionEmitter(Duration duration, Function<Transaction, Uni<Void>> function, Function<R, Uni<R>> function2, BiFunction<Transaction, Throwable, Uni<Void>> biFunction, Function<Throwable, Uni<R>> function3) {
            this.producerChannels = new HashSet();
            this.txnTimeout = duration;
            this.beforeCommit = function;
            this.afterCommit = function2;
            this.beforeAbort = biFunction;
            this.afterAbort = function3;
        }

        private Uni<Transaction> createTransaction() {
            return Uni.createFrom().emitter(uniEmitter -> {
                TransactionBuilder newTransaction = PulsarTransactionsImpl.this.pulsarClient.newTransaction();
                if (this.txnTimeout != null) {
                    newTransaction = newTransaction.withTransactionTimeout(this.txnTimeout.toMillis(), TimeUnit.MILLISECONDS);
                }
                newTransaction.build().whenComplete((BiConsumer) (transaction, th) -> {
                    if (th == null) {
                        uniEmitter.complete(transaction);
                    } else {
                        uniEmitter.fail(th);
                    }
                });
            });
        }

        private Uni<Void> flushProducers() {
            Multi iterable = Multi.createFrom().iterable(this.producerChannels);
            PulsarClientService pulsarClientService = PulsarTransactionsImpl.this.pulsarClientService;
            Objects.requireNonNull(pulsarClientService);
            return iterable.map(pulsarClientService::getProducer).filter((v0) -> {
                return Objects.nonNull(v0);
            }).onItem().transformToUniAndMerge(producer -> {
                return Uni.createFrom().completionStage(producer.flushAsync());
            }).toUni();
        }

        Uni<R> execute(Function<TransactionalEmitter<T>, Uni<R>> function) {
            Context currentContext = Vertx.currentContext();
            Uni<Transaction> createTransaction = createTransaction();
            if (currentContext != null) {
                createTransaction = createTransaction.emitOn(runnable -> {
                    currentContext.runOnContext(r3 -> {
                        runnable.run();
                    });
                });
            }
            return createTransaction.invoke(transaction -> {
                this.currentTransaction = transaction;
                PulsarTransactionsImpl.this.txnCount.incrementAndGet();
            }).chain(transaction2 -> {
                return executeInTransaction(function);
            }).eventually(() -> {
                PulsarTransactionsImpl.this.txnCount.decrementAndGet();
                this.currentTransaction = null;
            });
        }

        private Uni<R> executeInTransaction(Function<TransactionalEmitter<T>, Uni<R>> function) {
            return Uni.createFrom().nullItem().chain(() -> {
                return (Uni) function.apply(this);
            }).call(() -> {
                return flushProducers();
            }).onFailure().call(th -> {
                return abort(th);
            }).onCancellation().call(() -> {
                return abort(new RuntimeException("Transaction cancelled"));
            }).call(() -> {
                return this.abort ? abort(new RuntimeException("Transaction aborted")) : commit();
            }).onFailure().recoverWithUni(th2 -> {
                return this.afterAbort.apply(th2);
            }).onItem().transformToUni(obj -> {
                return this.afterCommit.apply(obj);
            });
        }

        private Uni<Void> commit() {
            return this.beforeCommit.apply(this.currentTransaction).chain(r5 -> {
                UniCreate createFrom = Uni.createFrom();
                Transaction transaction = this.currentTransaction;
                Objects.requireNonNull(transaction);
                return createFrom.completionStage(transaction::commit);
            });
        }

        private Uni<Void> abort(Throwable th) {
            return this.beforeAbort.apply(this.currentTransaction, th).chain(r5 -> {
                UniCreate createFrom = Uni.createFrom();
                Transaction transaction = this.currentTransaction;
                Objects.requireNonNull(transaction);
                return createFrom.completionStage(transaction::abort);
            }).plug(uni -> {
                return this.abort ? uni.chain(() -> {
                    return Uni.createFrom().failure(th);
                }) : uni;
            });
        }

        @Override // io.smallrye.reactive.messaging.pulsar.transactions.TransactionalEmitter
        public <M extends Message<? extends T>> void send(M m) {
            PulsarTransactionsImpl.this.send((TransactionalEmitter<?>) this, (PulsarTransactionEmitter<R>) m);
        }

        @Override // io.smallrye.reactive.messaging.pulsar.transactions.TransactionalEmitter
        public void send(T t) {
            send((PulsarTransactionEmitter<R>) Message.of(t));
        }

        @Override // io.smallrye.reactive.messaging.pulsar.transactions.TransactionalEmitter
        public void markForAbort() {
            this.abort = true;
        }

        @Override // io.smallrye.reactive.messaging.pulsar.transactions.TransactionalEmitter
        public boolean isMarkedForAbort() {
            return this.abort;
        }

        @Override // io.smallrye.reactive.messaging.pulsar.transactions.TransactionalEmitter
        public Transaction getTransaction(String str) {
            if (str != null) {
                this.producerChannels.add(str);
            }
            return this.currentTransaction;
        }
    }

    public PulsarTransactionsImpl(EmitterConfiguration emitterConfiguration, long j, PulsarClientService pulsarClientService) {
        super(emitterConfiguration, j);
        this.txnCount = new AtomicInteger();
        this.pulsarClientService = pulsarClientService;
        this.pulsarClient = pulsarClientService.getClient(emitterConfiguration.name());
    }

    @Override // io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions
    public <R> Uni<R> withTransaction(Function<TransactionalEmitter<T>, Uni<R>> function) {
        return new PulsarTransactionEmitter(this).execute(function);
    }

    @Override // io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions
    public <R> Uni<R> withTransaction(Duration duration, Function<TransactionalEmitter<T>, Uni<R>> function) {
        return new PulsarTransactionEmitter(this, duration).execute(function);
    }

    @Override // io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions
    public <R> Uni<R> withTransaction(Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> function) {
        return withTransaction(null, message, function);
    }

    @Override // io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions
    public <R> Uni<R> withTransaction(Duration duration, Message<?> message, Function<TransactionalEmitter<T>, Uni<R>> function) {
        return new PulsarTransactionEmitter(duration, transaction -> {
            return Uni.createFrom().completionStage(message.ack());
        }, PulsarTransactionsImpl::defaultAfterCommit, (transaction2, th) -> {
            return !(th.getCause() instanceof PulsarClientException.TransactionConflictException) ? Uni.createFrom().completionStage(() -> {
                return message.nack(th);
            }) : VOID_UNI;
        }, PulsarTransactionsImpl::defaultAfterAbort).execute(transactionalEmitter -> {
            if (message instanceof MetadataInjectableMessage) {
                ((MetadataInjectableMessage) message).injectMetadata(new PulsarTransactionMetadata(transactionalEmitter.getTransaction(null)));
            }
            return (Uni) function.apply(transactionalEmitter);
        });
    }

    @Override // io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions
    public <M extends Message<? extends T>> void send(TransactionalEmitter<?> transactionalEmitter, M m) {
        UniSubscribe subscribe = sendMessage(m.addMetadata(new PulsarTransactionMetadata(transactionalEmitter.getTransaction(this.name)))).subscribe();
        Consumer consumer = r1 -> {
        };
        PulsarLogging pulsarLogging = PulsarLogging.log;
        Objects.requireNonNull(pulsarLogging);
        subscribe.with(consumer, pulsarLogging::unableToDispatch);
    }

    @Override // io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions
    public void send(TransactionalEmitter<?> transactionalEmitter, T t) {
        send(transactionalEmitter, (TransactionalEmitter<?>) Message.of(t));
    }

    @Override // io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactions
    public boolean isTransactionInProgress() {
        return this.txnCount.get() != 0;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <R> Uni<R> defaultAfterCommit(R r) {
        return Uni.createFrom().item(r);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <R> Uni<R> defaultAfterAbort(Throwable th) {
        return Uni.createFrom().failure(th);
    }
}
