package io.smallrye.reactive.messaging.pulsar.ack;

import io.smallrye.common.annotation.Identifier;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnFailure;
import io.smallrye.reactive.messaging.pulsar.PulsarAckHandler;
import io.smallrye.reactive.messaging.pulsar.PulsarConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.pulsar.PulsarIncomingMessage;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactionMetadata;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Objects;
import java.util.Optional;
import org.apache.pulsar.client.api.Consumer;

/* loaded from: input_file:io/smallrye/reactive/messaging/pulsar/ack/PulsarMessageAck.class */
public class PulsarMessageAck implements PulsarAckHandler {
    public static final String STRATEGY_NAME = "ack";
    private final Consumer<?> consumer;

    @ApplicationScoped
    @Identifier(PulsarMessageAck.STRATEGY_NAME)
    /* loaded from: input_file:io/smallrye/reactive/messaging/pulsar/ack/PulsarMessageAck$Factory.class */
    public static class Factory implements PulsarAckHandler.Factory {
        @Override // io.smallrye.reactive.messaging.pulsar.PulsarAckHandler.Factory
        public PulsarMessageAck create(Consumer<?> consumer, PulsarConnectorIncomingConfiguration pulsarConnectorIncomingConfiguration) {
            return new PulsarMessageAck(consumer);
        }

        @Override // io.smallrye.reactive.messaging.pulsar.PulsarAckHandler.Factory
        public /* bridge */ /* synthetic */ PulsarAckHandler create(Consumer consumer, PulsarConnectorIncomingConfiguration pulsarConnectorIncomingConfiguration) {
            return create((Consumer<?>) consumer, pulsarConnectorIncomingConfiguration);
        }
    }

    public PulsarMessageAck(Consumer<?> consumer) {
        this.consumer = consumer;
    }

    @Override // io.smallrye.reactive.messaging.pulsar.PulsarAckHandler
    public Uni<Void> handle(PulsarIncomingMessage<?> pulsarIncomingMessage) {
        UniOnFailure onFailure = Uni.createFrom().completionStage(() -> {
            Optional metadata = pulsarIncomingMessage.getMetadata(PulsarTransactionMetadata.class);
            return metadata.isPresent() ? this.consumer.acknowledgeAsync(pulsarIncomingMessage.getMessageId(), ((PulsarTransactionMetadata) metadata.get()).getTransaction()) : this.consumer.acknowledgeAsync(pulsarIncomingMessage.getMessageId());
        }).onFailure();
        PulsarLogging pulsarLogging = PulsarLogging.log;
        Objects.requireNonNull(pulsarLogging);
        Uni invoke = onFailure.invoke(pulsarLogging::unableToAcknowledgeMessage);
        Objects.requireNonNull(pulsarIncomingMessage);
        return invoke.emitOn(pulsarIncomingMessage::runOnMessageContext);
    }
}
