/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.KafkaOperations;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.listener.ConsumerRecordRecoverer;
import org.springframework.kafka.listener.ListenerUtils;
import org.springframework.kafka.support.serializer.DeserializationException;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;

public class DeadLetterPublishingRecoverer
implements ConsumerRecordRecoverer {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(DeadLetterPublishingRecoverer.class));
    private static final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> DEFAULT_DESTINATION_RESOLVER = (cr, e) -> new TopicPartition(cr.topic() + ".DLT", cr.partition());
    private final KafkaTemplate<Object, Object> template;
    private final Map<Class<?>, KafkaTemplate<?, ?>> templates;
    private final boolean transactional;
    private final BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver;

    public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends Object> template) {
        this(template, DEFAULT_DESTINATION_RESOLVER);
    }

    public DeadLetterPublishingRecoverer(KafkaTemplate<? extends Object, ? extends Object> template, BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
        this(Collections.singletonMap(Object.class, template), destinationResolver);
    }

    public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaTemplate<? extends Object, ? extends Object>> templates) {
        this(templates, DEFAULT_DESTINATION_RESOLVER);
    }

    public DeadLetterPublishingRecoverer(Map<Class<?>, KafkaTemplate<? extends Object, ? extends Object>> templates, BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
        Assert.isTrue((!ObjectUtils.isEmpty(templates) ? 1 : 0) != 0, (String)"At least one template is required");
        Assert.notNull(destinationResolver, (String)"The destinationResolver cannot be null");
        this.template = templates.size() == 1 ? templates.values().iterator().next() : null;
        this.templates = templates;
        this.transactional = templates.values().iterator().next().isTransactional();
        Boolean tx = this.transactional;
        Assert.isTrue((boolean)templates.values().stream().map(t -> t.isTransactional()).allMatch(t -> t.equals(tx)), (String)"All templates must have the same setting for transactional");
        this.destinationResolver = destinationResolver;
    }

    @Override
    public void accept(ConsumerRecord<?, ?> record, Exception exception) {
        TopicPartition tp = this.destinationResolver.apply(record, exception);
        RecordHeaders headers = new RecordHeaders(record.headers().toArray());
        this.enhanceHeaders(headers, record, exception);
        boolean isKey = false;
        DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(record, "springDeserializerExceptionValue", LOGGER);
        if (deserEx == null) {
            deserEx = ListenerUtils.getExceptionFromHeader(record, "springDeserializerExceptionKey", LOGGER);
            isKey = true;
        }
        ProducerRecord<Object, Object> outRecord = this.createProducerRecord(record, tp, headers, deserEx == null ? null : deserEx.getData(), isKey);
        KafkaTemplate<Object, Object> kafkaTemplate = this.findTemplateForValue(outRecord.value());
        if (this.transactional && !kafkaTemplate.inTransaction() && !kafkaTemplate.isAllowNonTransactional()) {
            kafkaTemplate.executeInTransaction(t -> {
                this.publish(outRecord, t);
                return null;
            });
        } else {
            this.publish(outRecord, kafkaTemplate);
        }
    }

    private KafkaTemplate<Object, Object> findTemplateForValue(Object value) {
        if (this.template != null) {
            return this.template;
        }
        Optional<Class> key = this.templates.keySet().stream().filter(k -> k.isAssignableFrom(value.getClass())).findFirst();
        if (key.isPresent()) {
            return this.templates.get(key.get());
        }
        LOGGER.warn(() -> "Failed to find a template for " + value.getClass() + " attempting to use the last entry");
        return this.templates.values().stream().reduce((first, second) -> second).get();
    }

    protected ProducerRecord<Object, Object> createProducerRecord(ConsumerRecord<?, ?> record, TopicPartition topicPartition, RecordHeaders headers, @Nullable byte[] data, boolean isKey) {
        return new ProducerRecord(topicPartition.topic(), topicPartition.partition() < 0 ? null : Integer.valueOf(topicPartition.partition()), isKey && data != null ? (Object)data : record.key(), data == null || isKey ? record.value() : (Object)data, (Iterable)headers);
    }

    protected void publish(ProducerRecord<Object, Object> outRecord, KafkaOperations<Object, Object> kafkaTemplate) {
        try {
            kafkaTemplate.send(outRecord).addCallback(result -> LOGGER.debug(() -> "Successful dead-letter publication: " + result), ex -> LOGGER.error(ex, () -> "Dead-letter publication failed for: " + outRecord));
        }
        catch (Exception e) {
            LOGGER.error((Throwable)e, () -> "Dead-letter publication failed for: " + outRecord);
        }
    }

    private void enhanceHeaders(RecordHeaders kafkaHeaders, ConsumerRecord<?, ?> record, Exception exception) {
        kafkaHeaders.add((Header)new RecordHeader("kafka_dlt-original-topic", record.topic().getBytes(StandardCharsets.UTF_8)));
        kafkaHeaders.add((Header)new RecordHeader("kafka_dlt-original-partition", ByteBuffer.allocate(4).putInt(record.partition()).array()));
        kafkaHeaders.add((Header)new RecordHeader("kafka_dlt-original-offset", ByteBuffer.allocate(8).putLong(record.offset()).array()));
        kafkaHeaders.add((Header)new RecordHeader("kafka_dlt-original-timestamp", ByteBuffer.allocate(8).putLong(record.timestamp()).array()));
        kafkaHeaders.add((Header)new RecordHeader("kafka_dlt-original-timestamp-type", record.timestampType().toString().getBytes(StandardCharsets.UTF_8)));
        kafkaHeaders.add((Header)new RecordHeader("kafka_dlt-exception-fqcn", exception.getClass().getName().getBytes(StandardCharsets.UTF_8)));
        String message = exception.getMessage();
        if (message != null) {
            kafkaHeaders.add((Header)new RecordHeader("kafka_dlt-exception-message", exception.getMessage().getBytes(StandardCharsets.UTF_8)));
        }
        kafkaHeaders.add((Header)new RecordHeader("kafka_dlt-exception-stacktrace", this.getStackTraceAsString(exception).getBytes(StandardCharsets.UTF_8)));
    }

    private String getStackTraceAsString(Throwable cause) {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter((Writer)stringWriter, true);
        cause.printStackTrace(printWriter);
        return stringWriter.getBuffer().toString();
    }
}

