/*
 * Decompiled with CFR 0.152.
 */
package org.axonframework.kafka.eventhandling.producer;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.axonframework.common.Registration;
import org.axonframework.eventhandling.EventMessage;
import org.axonframework.kafka.eventhandling.KafkaMessageConverter;
import org.axonframework.kafka.eventhandling.producer.ConfirmationMode;
import org.axonframework.kafka.eventhandling.producer.KafkaPublisherConfiguration;
import org.axonframework.kafka.eventhandling.producer.ProducerFactory;
import org.axonframework.messaging.EventPublicationFailedException;
import org.axonframework.messaging.SubscribableMessageSource;
import org.axonframework.messaging.unitofwork.CurrentUnitOfWork;
import org.axonframework.messaging.unitofwork.UnitOfWork;
import org.axonframework.monitoring.MessageMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaPublisher<K, V> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaPublisher.class);
    private final SubscribableMessageSource<EventMessage<?>> messageSource;
    private final ProducerFactory<K, V> producerFactory;
    private final KafkaMessageConverter<K, V> messageConverter;
    private final MessageMonitor<? super EventMessage<?>> messageMonitor;
    private final String topic;
    private final long publisherAckTimeout;
    private Registration eventBusRegistration;

    public KafkaPublisher(KafkaPublisherConfiguration<K, V> config) {
        this.messageSource = config.getMessageSource();
        this.producerFactory = config.getProducerFactory();
        this.messageConverter = config.getMessageConverter();
        this.messageMonitor = config.getMessageMonitor();
        this.topic = config.getTopic();
        this.publisherAckTimeout = config.getPublisherAckTimeout();
    }

    public void start() {
        this.eventBusRegistration = this.messageSource.subscribe(this::send);
    }

    public void shutDown() {
        if (this.eventBusRegistration != null) {
            this.eventBusRegistration.cancel();
            this.eventBusRegistration = null;
        }
        this.producerFactory.shutDown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void send(List<? extends EventMessage<?>> events) {
        Map monitorCallbacks = this.messageMonitor.onMessagesIngested(events);
        Producer<K, V> producer = this.producerFactory.createProducer();
        ConfirmationMode cm = this.producerFactory.confirmationMode();
        try {
            if (cm.isTransactional()) {
                this.tryBeginTxn(producer);
            }
            Map<Future<RecordMetadata>, EventMessage<?>> publishStatuses = this.publishToKafka(events, producer);
            if (CurrentUnitOfWork.isStarted()) {
                this.handleActiveUnitOfWork(producer, publishStatuses, monitorCallbacks, cm);
            } else if (cm.isTransactional()) {
                this.tryCommit(producer, monitorCallbacks);
            } else if (cm.isWaitForAck()) {
                this.waitForPublishAck(publishStatuses, monitorCallbacks);
            }
        }
        finally {
            if (!CurrentUnitOfWork.isStarted()) {
                this.tryClose(producer);
            }
        }
    }

    private Map<Future<RecordMetadata>, ? super EventMessage<?>> publishToKafka(List<? extends EventMessage<?>> events, Producer<K, V> producer) {
        HashMap results = new HashMap();
        events.forEach(event -> results.put(producer.send(this.messageConverter.createKafkaMessage((EventMessage<?>)event, this.topic)), event));
        return results;
    }

    private void handleActiveUnitOfWork(Producer<K, V> producer, Map<Future<RecordMetadata>, ? super EventMessage<?>> futures, Map<? super EventMessage<?>, MessageMonitor.MonitorCallback> monitorCallbacks, ConfirmationMode confirmationMode) {
        UnitOfWork uow = CurrentUnitOfWork.get();
        uow.afterCommit(u -> this.completeKafkaWork(monitorCallbacks, producer, confirmationMode, futures));
        uow.onRollback(u -> this.rollbackKafkaWork(producer, confirmationMode));
    }

    private void completeKafkaWork(Map<? super EventMessage<?>, MessageMonitor.MonitorCallback> monitorCallbackMap, Producer<K, V> producer, ConfirmationMode confirmationMode, Map<Future<RecordMetadata>, ? super EventMessage<?>> futures) {
        if (confirmationMode.isTransactional()) {
            this.tryCommit(producer, monitorCallbackMap);
        } else if (confirmationMode.isWaitForAck()) {
            this.waitForPublishAck(futures, monitorCallbackMap);
        }
        this.tryClose(producer);
    }

    private void rollbackKafkaWork(Producer<K, V> producer, ConfirmationMode confirmationMode) {
        if (confirmationMode.isTransactional()) {
            this.tryRollback(producer);
        }
        this.tryClose(producer);
    }

    private void waitForPublishAck(Map<Future<RecordMetadata>, ? super EventMessage<?>> futures, Map<? super EventMessage<?>, MessageMonitor.MonitorCallback> monitorCallbacks) {
        long deadline = System.currentTimeMillis() + this.publisherAckTimeout;
        futures.forEach((k, v) -> {
            try {
                k.get(Math.max(0L, deadline - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
                if (monitorCallbacks.containsKey(v)) {
                    ((MessageMonitor.MonitorCallback)monitorCallbacks.get(v)).reportSuccess();
                }
            }
            catch (InterruptedException | ExecutionException | TimeoutException ex) {
                ((MessageMonitor.MonitorCallback)monitorCallbacks.get(v)).reportFailure((Throwable)ex);
                logger.warn("Encountered error while waiting for event publication", (Throwable)ex);
            }
        });
    }

    private void tryBeginTxn(Producer<?, ?> producer) {
        try {
            producer.beginTransaction();
        }
        catch (ProducerFencedException e) {
            logger.warn("Unable to begin transaction", (Throwable)e);
            throw new EventPublicationFailedException("Event publication failed: Exception occurred while starting kafka transaction", (Throwable)e);
        }
    }

    private void tryCommit(Producer<?, ?> producer, Map<? super EventMessage<?>, MessageMonitor.MonitorCallback> monitorCallbacks) {
        try {
            producer.commitTransaction();
            monitorCallbacks.forEach((k, v) -> v.reportSuccess());
        }
        catch (ProducerFencedException e) {
            logger.warn("Unable to commit transaction", (Throwable)e);
            monitorCallbacks.forEach((k, v) -> v.reportFailure((Throwable)e));
            throw new EventPublicationFailedException("Event publication failed: Exception occurred while committing kafka transaction", (Throwable)e);
        }
    }

    private void tryClose(Producer<?, ?> producer) {
        try {
            producer.close();
        }
        catch (Exception e) {
            logger.debug("Unable to close producer.", (Throwable)e);
        }
    }

    private void tryRollback(Producer<?, ?> producer) {
        try {
            producer.abortTransaction();
        }
        catch (Exception e) {
            logger.warn("Unable to abort transaction", (Throwable)e);
        }
    }
}

