package com.linecorp.armeria.server.logging.kafka;

import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.server.logging.AccessLogWriter;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linecorp/armeria/server/logging/kafka/KafkaAccessLogWriter.class */
public final class KafkaAccessLogWriter<K, V> implements AccessLogWriter {
    private static final Logger logger = LoggerFactory.getLogger(KafkaAccessLogWriter.class);
    private final Producer<K, V> producer;
    private final String topic;
    private final Function<? super RequestLog, ? extends K> keyExtractor;
    private final Function<? super RequestLog, ? extends V> valueExtractor;

    public KafkaAccessLogWriter(Producer<K, V> producer, String str, Function<? super RequestLog, ? extends V> function) {
        this(producer, str, requestLog -> {
            return null;
        }, function);
    }

    public KafkaAccessLogWriter(Producer<K, V> producer, String str, Function<? super RequestLog, ? extends K> function, Function<? super RequestLog, ? extends V> function2) {
        this.producer = (Producer) Objects.requireNonNull(producer, "producer");
        this.topic = (String) Objects.requireNonNull(str, "topic");
        this.keyExtractor = (Function) Objects.requireNonNull(function, "keyExtractor");
        this.valueExtractor = (Function) Objects.requireNonNull(function2, "valueExtractor");
    }

    public void log(RequestLog requestLog) {
        V apply = this.valueExtractor.apply(requestLog);
        if (apply == null) {
            return;
        }
        ProducerRecord producerRecord = new ProducerRecord(this.topic, this.keyExtractor.apply(requestLog), apply);
        this.producer.send(producerRecord, (recordMetadata, exc) -> {
            if (exc != null) {
                logger.warn("Failed to send a record to Kafka: {}", producerRecord, exc);
            }
        });
    }

    public CompletableFuture<Void> shutdown() {
        Producer<K, V> producer = this.producer;
        Objects.requireNonNull(producer);
        return CompletableFuture.runAsync(producer::close);
    }
}
