package io.smallrye.reactive.messaging.pulsar;

import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniCreate;
import io.smallrye.reactive.messaging.OutgoingMessageMetadata;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.providers.helpers.MultiUtils;
import io.smallrye.reactive.messaging.providers.helpers.SenderProcessor;
import io.smallrye.reactive.messaging.pulsar.i18n.PulsarLogging;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarAttributesExtractor;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTrace;
import io.smallrye.reactive.messaging.pulsar.tracing.PulsarTraceTextMapSetter;
import io.smallrye.reactive.messaging.pulsar.transactions.PulsarTransactionMetadata;
import io.smallrye.reactive.messaging.tracing.TracingUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.Flow;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.eclipse.microprofile.reactive.messaging.Message;

/* loaded from: input_file:io/smallrye/reactive/messaging/pulsar/PulsarOutgoingChannel.class */
public class PulsarOutgoingChannel<T> {
    private final Producer<T> producer;
    private final SenderProcessor processor;
    private final Flow.Subscriber<? extends Message<?>> subscriber;
    private final String channel;
    private final boolean healthEnabled;
    private final List<Throwable> failures = new ArrayList();
    private final boolean tracingEnabled;
    private final Instrumenter<PulsarTrace, Void> instrumenter;

    public PulsarOutgoingChannel(PulsarClient pulsarClient, Schema<T> schema, PulsarConnectorOutgoingConfiguration pulsarConnectorOutgoingConfiguration, ConfigResolver configResolver) throws PulsarClientException {
        this.channel = pulsarConnectorOutgoingConfiguration.getChannel();
        this.healthEnabled = pulsarConnectorOutgoingConfiguration.getHealthEnabled().booleanValue();
        this.tracingEnabled = pulsarConnectorOutgoingConfiguration.getTracingEnabled().booleanValue();
        ProducerConfigurationData producerConf = configResolver.getProducerConf(pulsarConnectorOutgoingConfiguration);
        if (producerConf.getProducerName() == null) {
            producerConf.setProducerName(this.channel);
        }
        if (producerConf.getTopicName() == null) {
            producerConf.setTopicName(pulsarConnectorOutgoingConfiguration.getTopic().orElse(this.channel));
        }
        ProducerBuilder loadConf = pulsarClient.newProducer(schema).loadConf(configResolver.configToMap(producerConf));
        if (producerConf.getBatcherBuilder() != null) {
            loadConf.batcherBuilder(producerConf.getBatcherBuilder());
        }
        if (producerConf.getCryptoKeyReader() != null) {
            loadConf.cryptoKeyReader(producerConf.getCryptoKeyReader());
        }
        Iterator it = producerConf.getEncryptionKeys().iterator();
        while (it.hasNext()) {
            loadConf.addEncryptionKey((String) it.next());
        }
        this.producer = loadConf.create();
        PulsarLogging.log.createdProducerWithConfig(this.channel, SchemaResolver.getSchemaName(schema), producerConf);
        long intValue = pulsarConnectorOutgoingConfiguration.getMaxPendingMessages().intValue();
        this.processor = new SenderProcessor(intValue <= 0 ? Long.MAX_VALUE : intValue, pulsarConnectorOutgoingConfiguration.getWaitForWriteCompletion().booleanValue(), this::sendMessage);
        this.subscriber = MultiUtils.via(this.processor, multi -> {
            return multi.onFailure().invoke(th -> {
                PulsarLogging.log.unableToDispatch(th);
                reportFailure(th);
            });
        });
        PulsarAttributesExtractor pulsarAttributesExtractor = new PulsarAttributesExtractor();
        MessagingAttributesGetter<PulsarTrace, Void> messagingAttributesGetter = pulsarAttributesExtractor.getMessagingAttributesGetter();
        this.instrumenter = Instrumenter.builder(GlobalOpenTelemetry.get(), "io.smallrye.reactive.messaging", MessagingSpanNameExtractor.create(messagingAttributesGetter, MessageOperation.PUBLISH)).addAttributesExtractor(MessagingAttributesExtractor.create(messagingAttributesGetter, MessageOperation.PUBLISH)).addAttributesExtractor(pulsarAttributesExtractor).buildProducerInstrumenter(PulsarTraceTextMapSetter.INSTANCE);
    }

    private Uni<Void> sendMessage(Message<?> message) {
        return Uni.createFrom().item(message).onItem().transform(message2 -> {
            return toMessageBuilder(message2, this.producer);
        }).onItem().transformToUni(typedMessageBuilder -> {
            UniCreate createFrom = Uni.createFrom();
            Objects.requireNonNull(typedMessageBuilder);
            return createFrom.completionStage(typedMessageBuilder::sendAsync);
        }).onItemOrFailure().transformToUni((messageId, th) -> {
            if (th != null) {
                return Uni.createFrom().completionStage(message.nack(th));
            }
            OutgoingMessageMetadata.setResultOnMessage(message, messageId);
            return Uni.createFrom().completionStage(message.ack());
        });
    }

    private TypedMessageBuilder<T> createMessageBuilder(Message<?> message, Transaction transaction) {
        Transaction transaction2 = (Transaction) message.getMetadata(PulsarTransactionMetadata.class).map((v0) -> {
            return v0.getTransaction();
        }).orElse(transaction);
        return transaction2 != null ? this.producer.newMessage(transaction2) : this.producer.newMessage();
    }

    private TypedMessageBuilder<T> toMessageBuilder(Message<?> message, Producer<T> producer) {
        TypedMessageBuilder<T> createMessageBuilder;
        Optional metadata = message.getMetadata(PulsarOutgoingMessageMetadata.class);
        if (metadata.isPresent()) {
            PulsarOutgoingMessageMetadata pulsarOutgoingMessageMetadata = (PulsarOutgoingMessageMetadata) metadata.get();
            if (this.tracingEnabled) {
                TracingUtils.traceOutgoing(this.instrumenter, message, new PulsarTrace.Builder().withProperties(pulsarOutgoingMessageMetadata.getProperties()).withSequenceId(pulsarOutgoingMessageMetadata.getSequenceId()).withTopic(producer.getTopic()).build());
            }
            createMessageBuilder = createMessageBuilder(message, pulsarOutgoingMessageMetadata.getTransaction());
            if (pulsarOutgoingMessageMetadata.hasKey()) {
                if (pulsarOutgoingMessageMetadata.getKeyBytes() != null) {
                    createMessageBuilder.keyBytes(pulsarOutgoingMessageMetadata.getKeyBytes());
                } else {
                    createMessageBuilder.key(pulsarOutgoingMessageMetadata.getKey());
                }
            }
            if (pulsarOutgoingMessageMetadata.getOrderingKey() != null) {
                createMessageBuilder.orderingKey(pulsarOutgoingMessageMetadata.getOrderingKey());
            }
            if (pulsarOutgoingMessageMetadata.getProperties() != null) {
                createMessageBuilder.properties(pulsarOutgoingMessageMetadata.getProperties());
            }
            if (pulsarOutgoingMessageMetadata.getReplicatedClusters() != null) {
                createMessageBuilder.replicationClusters(pulsarOutgoingMessageMetadata.getReplicatedClusters());
            }
            if (pulsarOutgoingMessageMetadata.getReplicationDisabled() != null) {
                createMessageBuilder.disableReplication();
            }
            if (pulsarOutgoingMessageMetadata.getEventTime() != null) {
                createMessageBuilder.eventTime(pulsarOutgoingMessageMetadata.getEventTime().longValue());
            }
            if (pulsarOutgoingMessageMetadata.getSequenceId() != null) {
                createMessageBuilder.sequenceId(pulsarOutgoingMessageMetadata.getSequenceId().longValue());
            }
            if (pulsarOutgoingMessageMetadata.getDeliverAt() != null) {
                createMessageBuilder.deliverAt(pulsarOutgoingMessageMetadata.getDeliverAt().longValue());
            }
        } else {
            createMessageBuilder = createMessageBuilder(message, null);
            if (this.tracingEnabled) {
                HashMap hashMap = new HashMap();
                TracingUtils.traceOutgoing(this.instrumenter, message, new PulsarTrace.Builder().withProperties(hashMap).withTopic(producer.getTopic()).build());
                createMessageBuilder.properties(hashMap);
            }
        }
        Object payload = message.getPayload();
        if (!(payload instanceof OutgoingMessage)) {
            return createMessageBuilder.value(message.getPayload());
        }
        OutgoingMessage outgoingMessage = (OutgoingMessage) payload;
        if (outgoingMessage.hasKey()) {
            if (outgoingMessage.getKeyBytes() != null) {
                createMessageBuilder.keyBytes(outgoingMessage.getKeyBytes());
            } else {
                createMessageBuilder.key(outgoingMessage.getKey());
            }
        }
        if (outgoingMessage.getProperties() != null) {
            createMessageBuilder.properties(outgoingMessage.getProperties());
        }
        if (outgoingMessage.getOrderingKey() != null) {
            createMessageBuilder.orderingKey(outgoingMessage.getOrderingKey());
        }
        if (outgoingMessage.getSequenceId() != null) {
            createMessageBuilder.sequenceId(outgoingMessage.getSequenceId().longValue());
        }
        if (outgoingMessage.getEventTime() != null) {
            createMessageBuilder.eventTime(outgoingMessage.getEventTime().longValue());
        }
        if (outgoingMessage.getDeliverAt() != null) {
            createMessageBuilder.deliverAt(outgoingMessage.getDeliverAt().longValue());
        }
        if (outgoingMessage.getReplicationDisabled()) {
            createMessageBuilder.disableReplication();
        }
        if (outgoingMessage.getReplicatedClusters() != null) {
            createMessageBuilder.replicationClusters(outgoingMessage.getReplicatedClusters());
        }
        return createMessageBuilder.value(outgoingMessage.getValue());
    }

    public Flow.Subscriber<? extends Message<?>> getSubscriber() {
        return this.subscriber;
    }

    public String getChannel() {
        return this.channel;
    }

    public Producer<T> getProducer() {
        return this.producer;
    }

    public void close() {
        if (this.processor != null) {
            this.processor.cancel();
        }
        try {
            this.producer.close();
        } catch (PulsarClientException e) {
            PulsarLogging.log.unableToCloseProducer(e);
        }
    }

    private synchronized void reportFailure(Throwable th) {
        if (this.failures.size() == 10) {
            this.failures.remove(0);
        }
        this.failures.add(th);
    }

    public void isStarted(HealthReport.HealthReportBuilder healthReportBuilder) {
        if (this.healthEnabled) {
            healthReportBuilder.add(this.channel, this.producer.isConnected());
        }
    }

    public void isReady(HealthReport.HealthReportBuilder healthReportBuilder) {
        isStarted(healthReportBuilder);
    }

    public void isAlive(HealthReport.HealthReportBuilder healthReportBuilder) {
        ArrayList arrayList;
        if (this.healthEnabled) {
            synchronized (this) {
                arrayList = new ArrayList(this.failures);
            }
            if (arrayList.isEmpty()) {
                healthReportBuilder.add(this.channel, true);
            } else {
                healthReportBuilder.add(this.channel, false, (String) arrayList.stream().map((v0) -> {
                    return v0.getMessage();
                }).collect(Collectors.joining()));
            }
        }
    }
}
