package org.springframework.pulsar.core;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.interceptor.ProducerInterceptor;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.pulsar.core.PulsarOperations;
import org.springframework.pulsar.observation.DefaultPulsarTemplateObservationConvention;
import org.springframework.pulsar.observation.PulsarMessageSenderContext;
import org.springframework.pulsar.observation.PulsarTemplateObservation;
import org.springframework.pulsar.observation.PulsarTemplateObservationConvention;
import org.springframework.util.CollectionUtils;

/* loaded from: input_file:org/springframework/pulsar/core/PulsarTemplate.class */
public class PulsarTemplate<T> implements PulsarOperations<T>, BeanNameAware {
    private final LogAccessor logger;
    private final PulsarProducerFactory<T> producerFactory;

    @Nullable
    private final List<ProducerInterceptor> interceptors;

    @Nullable
    private final ObservationRegistry observationRegistry;

    @Nullable
    private final PulsarTemplateObservationConvention observationConvention;
    private String beanName;

    @Nullable
    private Schema<T> schema;

    /* loaded from: input_file:org/springframework/pulsar/core/PulsarTemplate$SendMessageBuilderImpl.class */
    public static class SendMessageBuilderImpl<T> implements PulsarOperations.SendMessageBuilder<T> {
        private final PulsarTemplate<T> template;
        private final T message;

        @Nullable
        private String topic;

        @Nullable
        private Collection<String> encryptionKeys;

        @Nullable
        private TypedMessageBuilderCustomizer<T> messageCustomizer;

        @Nullable
        private ProducerBuilderCustomizer<T> producerCustomizer;

        SendMessageBuilderImpl(PulsarTemplate<T> pulsarTemplate, T t) {
            this.template = pulsarTemplate;
            this.message = t;
        }

        @Override // org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder
        public PulsarOperations.SendMessageBuilder<T> withTopic(String str) {
            this.topic = str;
            return this;
        }

        @Override // org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder
        public PulsarOperations.SendMessageBuilder<T> withEncryptionKeys(Collection<String> collection) {
            this.encryptionKeys = collection;
            return this;
        }

        @Override // org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder
        public PulsarOperations.SendMessageBuilder<T> withMessageCustomizer(TypedMessageBuilderCustomizer<T> typedMessageBuilderCustomizer) {
            this.messageCustomizer = typedMessageBuilderCustomizer;
            return this;
        }

        @Override // org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder
        public PulsarOperations.SendMessageBuilder<T> withProducerCustomizer(ProducerBuilderCustomizer<T> producerBuilderCustomizer) {
            this.producerCustomizer = producerBuilderCustomizer;
            return this;
        }

        @Override // org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder
        public MessageId send() throws PulsarClientException {
            return this.template.doSend(this.topic, this.encryptionKeys, this.message, this.messageCustomizer, this.producerCustomizer);
        }

        @Override // org.springframework.pulsar.core.PulsarOperations.SendMessageBuilder
        public CompletableFuture<MessageId> sendAsync() throws PulsarClientException {
            return this.template.doSendAsync(this.topic, this.encryptionKeys, this.message, this.messageCustomizer, this.producerCustomizer);
        }
    }

    public PulsarTemplate(PulsarProducerFactory<T> pulsarProducerFactory) {
        this(pulsarProducerFactory, null);
    }

    public PulsarTemplate(PulsarProducerFactory<T> pulsarProducerFactory, @Nullable List<ProducerInterceptor> list) {
        this(pulsarProducerFactory, list, null, null);
    }

    public PulsarTemplate(PulsarProducerFactory<T> pulsarProducerFactory, @Nullable List<ProducerInterceptor> list, @Nullable ObservationRegistry observationRegistry, @Nullable PulsarTemplateObservationConvention pulsarTemplateObservationConvention) {
        this.logger = new LogAccessor(getClass());
        this.beanName = "";
        this.producerFactory = pulsarProducerFactory;
        this.interceptors = list;
        this.observationRegistry = observationRegistry;
        this.observationConvention = pulsarTemplateObservationConvention;
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public MessageId send(T t) throws PulsarClientException {
        return doSend(null, null, t, null, null);
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public MessageId send(@Nullable String str, T t) throws PulsarClientException {
        return doSend(str, null, t, null, null);
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public CompletableFuture<MessageId> sendAsync(T t) throws PulsarClientException {
        return doSendAsync(null, null, t, null, null);
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public CompletableFuture<MessageId> sendAsync(@Nullable String str, T t) throws PulsarClientException {
        return doSendAsync(str, null, t, null, null);
    }

    @Override // org.springframework.pulsar.core.PulsarOperations
    public PulsarOperations.SendMessageBuilder<T> newMessage(T t) {
        return new SendMessageBuilderImpl(this, t);
    }

    public void setBeanName(String str) {
        this.beanName = str;
    }

    public void setSchema(Schema<T> schema) {
        this.schema = schema;
    }

    private MessageId doSend(@Nullable String str, @Nullable Collection<String> collection, T t, @Nullable TypedMessageBuilderCustomizer<T> typedMessageBuilderCustomizer, @Nullable ProducerBuilderCustomizer<T> producerBuilderCustomizer) throws PulsarClientException {
        try {
            return doSendAsync(str, collection, t, typedMessageBuilderCustomizer, producerBuilderCustomizer).get();
        } catch (Exception e) {
            throw PulsarClientException.unwrap(e);
        }
    }

    private CompletableFuture<MessageId> doSendAsync(@Nullable String str, @Nullable Collection<String> collection, T t, @Nullable TypedMessageBuilderCustomizer<T> typedMessageBuilderCustomizer, @Nullable ProducerBuilderCustomizer<T> producerBuilderCustomizer) throws PulsarClientException {
        String resolveTopicName = ProducerUtils.resolveTopicName(str, this.producerFactory);
        this.logger.trace(() -> {
            return String.format("Sending msg to '%s' topic", resolveTopicName);
        });
        PulsarMessageSenderContext newContext = PulsarMessageSenderContext.newContext(resolveTopicName, this.beanName);
        Observation newObservation = newObservation(newContext);
        try {
            newObservation.start();
            Producer<T> prepareProducerForSend = prepareProducerForSend(str, t, collection, producerBuilderCustomizer);
            TypedMessageBuilder<T> value = prepareProducerForSend.newMessage().value(t);
            if (typedMessageBuilderCustomizer != null) {
                typedMessageBuilderCustomizer.customize(value);
            }
            Map<String, String> properties = newContext.properties();
            Objects.requireNonNull(value);
            properties.forEach(value::property);
            return value.sendAsync().whenComplete((BiConsumer) (messageId, th) -> {
                if (th == null) {
                    this.logger.trace(() -> {
                        return String.format("Sent msg to '%s' topic", resolveTopicName);
                    });
                    newObservation.stop();
                } else {
                    this.logger.error(th, () -> {
                        return String.format("Failed to send msg to '%s' topic", resolveTopicName);
                    });
                    newObservation.error(th);
                    newObservation.stop();
                }
                ProducerUtils.closeProducerAsync(prepareProducerForSend, this.logger);
            });
        } catch (RuntimeException e) {
            newObservation.error(e);
            newObservation.stop();
            throw e;
        }
    }

    private Observation newObservation(PulsarMessageSenderContext pulsarMessageSenderContext) {
        return this.observationRegistry == null ? Observation.NOOP : PulsarTemplateObservation.TEMPLATE_OBSERVATION.observation(this.observationConvention, DefaultPulsarTemplateObservationConvention.INSTANCE, () -> {
            return pulsarMessageSenderContext;
        }, this.observationRegistry);
    }

    private Producer<T> prepareProducerForSend(@Nullable String str, T t, @Nullable Collection<String> collection, @Nullable ProducerBuilderCustomizer<T> producerBuilderCustomizer) throws PulsarClientException {
        Schema<T> schema = this.schema != null ? this.schema : SchemaUtils.getSchema(t);
        ArrayList arrayList = new ArrayList();
        if (!CollectionUtils.isEmpty(this.interceptors)) {
            arrayList.add(producerBuilder -> {
                List<ProducerInterceptor> list = this.interceptors;
                Objects.requireNonNull(producerBuilder);
                list.forEach(producerInterceptor -> {
                    producerBuilder.intercept(new ProducerInterceptor[]{producerInterceptor});
                });
            });
        }
        if (producerBuilderCustomizer != null) {
            arrayList.add(producerBuilderCustomizer);
        }
        return this.producerFactory.createProducer(schema, str, collection, arrayList);
    }
}
