package org.springframework.pulsar.core;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import java.time.Duration;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerStats;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/* loaded from: input_file:org/springframework/pulsar/core/CachingPulsarProducerFactory.class */
public class CachingPulsarProducerFactory<T> extends DefaultPulsarProducerFactory<T> implements DisposableBean {
    private final LogAccessor logger;
    private final Cache<ProducerCacheKey<T>, Producer<T>> producerCache;

    /* loaded from: input_file:org/springframework/pulsar/core/CachingPulsarProducerFactory$ProducerCacheKey.class */
    static class ProducerCacheKey<T> {
        private final Schema<T> schema;
        private final SchemaHash schemaHash;
        private final String topic;

        @Nullable
        private final Set<String> encryptionKeys;

        @Nullable
        private final List<ProducerBuilderCustomizer<T>> customizers;

        ProducerCacheKey(Schema<T> schema, String str, @Nullable Set<String> set, @Nullable List<ProducerBuilderCustomizer<T>> list) {
            Assert.notNull(schema, () -> {
                return "'schema' must be non-null";
            });
            Assert.notNull(str, () -> {
                return "'topic' must be non-null";
            });
            this.schema = schema;
            this.schemaHash = SchemaHash.of(this.schema);
            this.topic = str;
            this.encryptionKeys = set;
            this.customizers = list;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ProducerCacheKey producerCacheKey = (ProducerCacheKey) obj;
            return this.topic.equals(producerCacheKey.topic) && this.schemaHash.equals(producerCacheKey.schemaHash) && Objects.equals(this.encryptionKeys, producerCacheKey.encryptionKeys) && Objects.equals(this.customizers, producerCacheKey.customizers);
        }

        public int hashCode() {
            return this.topic.hashCode() + this.schemaHash.hashCode() + Objects.hashCode(this.encryptionKeys) + Objects.hashCode(this.customizers);
        }

        public String toString() {
            return "ProducerCacheKey{schema=" + this.schema + ", topic='" + this.topic + "', encryptionKeys=" + this.encryptionKeys + ", customizers=" + this.customizers + "}";
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/springframework/pulsar/core/CachingPulsarProducerFactory$ProducerWithCloseCallback.class */
    public static class ProducerWithCloseCallback<T> implements Producer<T> {
        private final Producer<T> producer;
        private final Consumer<Producer<T>> closeCallback;

        ProducerWithCloseCallback(Producer<T> producer, Consumer<Producer<T>> consumer) {
            this.producer = producer;
            this.closeCallback = consumer;
        }

        public Producer<T> getActualProducer() {
            return this.producer;
        }

        public String getTopic() {
            return this.producer.getTopic();
        }

        public String getProducerName() {
            return this.producer.getProducerName();
        }

        public MessageId send(T t) throws PulsarClientException {
            return this.producer.send(t);
        }

        public CompletableFuture<MessageId> sendAsync(T t) {
            return this.producer.sendAsync(t);
        }

        public void flush() throws PulsarClientException {
            this.producer.flush();
        }

        public CompletableFuture<Void> flushAsync() {
            return this.producer.flushAsync();
        }

        public TypedMessageBuilder<T> newMessage() {
            return this.producer.newMessage();
        }

        public <V> TypedMessageBuilder<V> newMessage(Schema<V> schema) {
            return this.producer.newMessage(schema);
        }

        public TypedMessageBuilder<T> newMessage(Transaction transaction) {
            return this.producer.newMessage(transaction);
        }

        public long getLastSequenceId() {
            return this.producer.getLastSequenceId();
        }

        public ProducerStats getStats() {
            return this.producer.getStats();
        }

        public void close() throws PulsarClientException {
            this.closeCallback.accept(this.producer);
        }

        public CompletableFuture<Void> closeAsync() {
            this.closeCallback.accept(this.producer);
            return CompletableFuture.completedFuture(null);
        }

        public boolean isConnected() {
            return this.producer.isConnected();
        }

        public long getLastDisconnectedTimestamp() {
            return this.producer.getLastDisconnectedTimestamp();
        }

        public int getNumOfPartitions() {
            return this.producer.getNumOfPartitions();
        }
    }

    public CachingPulsarProducerFactory(PulsarClient pulsarClient, Map<String, Object> map, TopicResolver topicResolver, Duration duration, Long l, Integer num) {
        super(pulsarClient, map, topicResolver);
        this.logger = new LogAccessor(getClass());
        this.producerCache = Caffeine.newBuilder().expireAfterAccess(duration).maximumSize(l.longValue()).initialCapacity(num.intValue()).scheduler(Scheduler.systemScheduler()).evictionListener((producerCacheKey, producer, removalCause) -> {
            this.logger.debug(() -> {
                return "Producer %s evicted from cache due to %s".formatted(ProducerUtils.formatProducer(producer), removalCause);
            });
            closeProducer(producer);
        }).build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.pulsar.core.DefaultPulsarProducerFactory
    public Producer<T> doCreateProducer(Schema<T> schema, @Nullable String str, @Nullable Collection<String> collection, @Nullable List<ProducerBuilderCustomizer<T>> list) {
        Objects.requireNonNull(schema, "Schema must be specified");
        return (Producer) this.producerCache.get(new ProducerCacheKey(schema, resolveTopicName(str), collection == null ? null : new HashSet(collection), list), producerCacheKey -> {
            return createCacheableProducer(producerCacheKey.schema, producerCacheKey.topic, producerCacheKey.encryptionKeys, list);
        });
    }

    private Producer<T> createCacheableProducer(Schema<T> schema, String str, @Nullable Collection<String> collection, @Nullable List<ProducerBuilderCustomizer<T>> list) {
        try {
            Producer<T> doCreateProducer = super.doCreateProducer(schema, str, collection, list);
            return new ProducerWithCloseCallback(doCreateProducer, producer -> {
                this.logger.trace(() -> {
                    return "Client closed producer %s but will skip actual closing".formatted(ProducerUtils.formatProducer(doCreateProducer));
                });
            });
        } catch (PulsarClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public void destroy() {
        this.producerCache.asMap().forEach((producerCacheKey, producer) -> {
            this.producerCache.invalidate(producerCacheKey);
            closeProducer(producer);
        });
    }

    private void closeProducer(Producer<T> producer) {
        Producer<T> producer2 = null;
        if (producer instanceof ProducerWithCloseCallback) {
            producer2 = ((ProducerWithCloseCallback) producer).getActualProducer();
        }
        if (producer2 == null) {
            this.logger.warn(() -> {
                return "Unable to get actual producer for %s - will skip closing it".formatted(ProducerUtils.formatProducer(producer));
            });
        } else {
            ProducerUtils.closeProducerAsync(producer2, this.logger);
        }
    }
}
