package com.linecorp.armeria.server.logging.structured.kafka;

import com.linecorp.armeria.common.logging.RequestLog;
import com.linecorp.armeria.server.HttpService;
import com.linecorp.armeria.server.logging.structured.StructuredLogBuilder;
import com.linecorp.armeria.server.logging.structured.StructuredLoggingService;
import java.util.Objects;
import java.util.Properties;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linecorp/armeria/server/logging/structured/kafka/KafkaStructuredLoggingService.class */
public class KafkaStructuredLoggingService<L> extends StructuredLoggingService<L> {
    private static final Logger logger = LoggerFactory.getLogger(KafkaStructuredLoggingService.class);
    private final Producer<byte[], L> producer;
    private final String topic;
    private final KeySelector<L> keySelector;
    private final boolean needToCloseProducer;

    @FunctionalInterface
    /* loaded from: input_file:com/linecorp/armeria/server/logging/structured/kafka/KafkaStructuredLoggingService$KeySelector.class */
    public interface KeySelector<E> {
        @Nullable
        byte[] selectKey(RequestLog requestLog, E e);
    }

    public static <L> Function<? super HttpService, StructuredLoggingService<L>> newDecorator(Producer<byte[], L> producer, String str, StructuredLogBuilder<L> structuredLogBuilder, @Nullable KeySelector<L> keySelector) {
        return httpService -> {
            return new KafkaStructuredLoggingService(httpService, structuredLogBuilder, producer, str, keySelector, false);
        };
    }

    public static <L> Function<? super HttpService, StructuredLoggingService<L>> newDecorator(Producer<byte[], L> producer, String str, StructuredLogBuilder<L> structuredLogBuilder) {
        return newDecorator(producer, str, structuredLogBuilder, (KeySelector) null);
    }

    public static <L> Function<? super HttpService, StructuredLoggingService<L>> newDecorator(String str, String str2, StructuredLogBuilder<L> structuredLogBuilder, @Nullable KeySelector<L> keySelector) {
        KafkaProducer kafkaProducer = new KafkaProducer(newDefaultConfig(str));
        return httpService -> {
            return new KafkaStructuredLoggingService(httpService, structuredLogBuilder, kafkaProducer, str2, keySelector, true);
        };
    }

    public static <L> Function<? super HttpService, StructuredLoggingService<L>> newDecorator(String str, String str2, StructuredLogBuilder<L> structuredLogBuilder) {
        return newDecorator(str, str2, structuredLogBuilder, (KeySelector) null);
    }

    private static Properties newDefaultConfig(String str) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", str);
        properties.setProperty("client.id", KafkaStructuredLoggingService.class.getSimpleName());
        properties.setProperty("acks", "all");
        properties.setProperty("retries", "3");
        return properties;
    }

    KafkaStructuredLoggingService(HttpService httpService, StructuredLogBuilder<L> structuredLogBuilder, Producer<byte[], L> producer, String str, @Nullable KeySelector<L> keySelector, boolean z) {
        super(httpService, structuredLogBuilder);
        this.producer = (Producer) Objects.requireNonNull(producer, "producer");
        this.topic = (String) Objects.requireNonNull(str, "topic");
        this.keySelector = keySelector == null ? (requestLog, obj) -> {
            return null;
        } : keySelector;
        this.needToCloseProducer = z;
    }

    protected void writeLog(RequestLog requestLog, L l) {
        ProducerRecord producerRecord = new ProducerRecord(this.topic, this.keySelector.selectKey(requestLog, l), l);
        this.producer.send(producerRecord, (recordMetadata, exc) -> {
            if (exc != null) {
                logger.warn("failed to send service log to Kafka {}", producerRecord, exc);
            }
        });
    }

    protected void close() {
        if (this.needToCloseProducer) {
            this.producer.close();
        }
    }
}
