/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.kafka.eventhandling.producer;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.axonframework.common.Assert;
import org.axonframework.kafka.eventhandling.producer.ConfirmationMode;
import org.axonframework.kafka.eventhandling.producer.ProducerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultProducerFactory<K, V>
implements ProducerFactory<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(DefaultProducerFactory.class);
    private final int closeTimeout;
    private final TimeUnit unit;
    private final BlockingQueue<CloseLazyProducer<K, V>> cache;
    private final Map<String, Object> configs;
    private final ConfirmationMode confirmationMode;
    private final String transactionIdPrefix;
    private final AtomicInteger transactionIdSuffix;
    private volatile CloseLazyProducer<K, V> producer;

    private DefaultProducerFactory(Builder<K, V> builder) {
        this.closeTimeout = ((Builder)builder).closeTimeout;
        this.unit = ((Builder)builder).unit;
        this.cache = new ArrayBlockingQueue<CloseLazyProducer<K, V>>(((Builder)builder).producerCacheSize);
        this.configs = new HashMap<String, Object>(((Builder)builder).configs);
        this.confirmationMode = ((Builder)builder).confirmationMode;
        this.transactionIdPrefix = ((Builder)builder).transactionIdPrefix;
        this.transactionIdSuffix = new AtomicInteger();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Producer<K, V> createProducer() {
        if (this.confirmationMode.isTransactional()) {
            return this.createTransactionalProducer();
        }
        if (this.producer == null) {
            DefaultProducerFactory defaultProducerFactory = this;
            synchronized (defaultProducerFactory) {
                if (this.producer == null) {
                    this.producer = new CloseLazyProducer<K, V>(this.createKafkaProducer(this.configs), this.cache, this.closeTimeout, this.unit);
                }
            }
        }
        return this.producer;
    }

    @Override
    public ConfirmationMode confirmationMode() {
        return this.confirmationMode;
    }

    @Override
    public void shutDown() {
        CloseLazyProducer producer = this.producer;
        this.producer = null;
        if (producer != null) {
            producer.delegate.close((long)this.closeTimeout, this.unit);
        }
        producer = (CloseLazyProducer)this.cache.poll();
        while (producer != null) {
            try {
                producer.delegate.close((long)this.closeTimeout, this.unit);
            }
            catch (Exception e) {
                logger.error("Exception closing producer", (Throwable)e);
            }
            producer = (CloseLazyProducer)this.cache.poll();
        }
    }

    private Producer<K, V> createTransactionalProducer() {
        CloseLazyProducer<K, V> producer = (CloseLazyProducer<K, V>)this.cache.poll();
        if (producer != null) {
            return producer;
        }
        HashMap<String, Object> configs = new HashMap<String, Object>(this.configs);
        configs.put("transactional.id", this.transactionIdPrefix + this.transactionIdSuffix.getAndIncrement());
        producer = new CloseLazyProducer<K, V>(this.createKafkaProducer(configs), this.cache, this.closeTimeout, this.unit);
        producer.initTransactions();
        return producer;
    }

    public static <K, V> Builder<K, V> builder(Map<String, Object> configs) {
        return new Builder(configs);
    }

    private Producer<K, V> createKafkaProducer(Map<String, Object> configs) {
        return new KafkaProducer(configs);
    }

    public static final class Builder<K, V> {
        private final Map<String, Object> configs;
        private String transactionIdPrefix;
        private int producerCacheSize = 10;
        private int closeTimeout = 30;
        private TimeUnit unit = TimeUnit.SECONDS;
        private ConfirmationMode confirmationMode = ConfirmationMode.NONE;

        private Builder(Map<String, Object> configs) {
            Assert.notNull(configs, () -> "'configs' may not be null");
            this.configs = Collections.unmodifiableMap(new HashMap<String, Object>(configs));
        }

        public Builder<K, V> withProducerCacheSize(int producerCacheSize) {
            Assert.isTrue((producerCacheSize > 0 ? 1 : 0) != 0, () -> "'producerCacheSize should be > 0");
            this.producerCacheSize = producerCacheSize;
            return this;
        }

        public Builder<K, V> withCloseTimeout(int timeout, TimeUnit unit) {
            Assert.isTrue((timeout > 0 ? 1 : 0) != 0, () -> "'closeTimeout' should be > 0");
            Assert.notNull((Object)((Object)unit), () -> "'timeUnit' may not be null");
            this.closeTimeout = timeout;
            this.unit = unit;
            return this;
        }

        public Builder<K, V> withConfirmationMode(ConfirmationMode confirmationMode) {
            Assert.notNull((Object)((Object)confirmationMode), () -> "'confirmationMode' may not be null");
            this.confirmationMode = confirmationMode;
            return this;
        }

        public Builder<K, V> withTransactionalIdPrefix(String transactionIdPrefix) {
            Assert.notNull((Object)transactionIdPrefix, () -> "'transactionIdPrefix' cannot be null");
            this.transactionIdPrefix = transactionIdPrefix;
            return this.withConfirmationMode(ConfirmationMode.TRANSACTIONAL);
        }

        public ProducerFactory<K, V> build() {
            return new DefaultProducerFactory(this);
        }
    }

    private static final class CloseLazyProducer<K, V>
    implements Producer<K, V> {
        private final Producer<K, V> delegate;
        private final BlockingQueue<CloseLazyProducer<K, V>> cache;
        private final int closeTimeout;
        private final TimeUnit unit;

        CloseLazyProducer(Producer<K, V> delegate, BlockingQueue<CloseLazyProducer<K, V>> cache, int closeTimeout, TimeUnit unit) {
            this.delegate = delegate;
            this.cache = cache;
            this.closeTimeout = closeTimeout;
            this.unit = unit;
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
            return this.delegate.send(record);
        }

        public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
            return this.delegate.send(record, callback);
        }

        public void flush() {
            this.delegate.flush();
        }

        public List<PartitionInfo> partitionsFor(String topic) {
            return this.delegate.partitionsFor(topic);
        }

        public Map<MetricName, ? extends Metric> metrics() {
            return this.delegate.metrics();
        }

        public void initTransactions() {
            this.delegate.initTransactions();
        }

        public void beginTransaction() throws ProducerFencedException {
            this.delegate.beginTransaction();
        }

        public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException {
            this.delegate.sendOffsetsToTransaction(offsets, consumerGroupId);
        }

        public void commitTransaction() throws ProducerFencedException {
            this.delegate.commitTransaction();
        }

        public void abortTransaction() throws ProducerFencedException {
            this.delegate.abortTransaction();
        }

        public void close() {
            this.close(this.closeTimeout, this.unit);
        }

        public void close(long timeout, TimeUnit unit) {
            boolean isAdded = this.cache.offer(this);
            if (!isAdded) {
                this.delegate.close(timeout, unit);
            }
        }

        public String toString() {
            return "CloseLazyProducer [delegate=" + this.delegate + "]";
        }
    }
}

