package org.apache.druid.emitter.kafka;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.druid.emitter.kafka.KafkaEmitterConfig;
import org.apache.druid.java.util.common.MemoryBoundLinkedBlockingQueue;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

/* loaded from: input_file:org/apache/druid/emitter/kafka/KafkaEmitter.class */
public class KafkaEmitter implements Emitter {
    private static Logger log = new Logger(KafkaEmitter.class);
    private static final int DEFAULT_SEND_INTERVAL_SECONDS = 10;
    private static final int DEFAULT_SEND_LOST_INTERVAL_MINUTES = 5;
    private static final int DEFAULT_RETRIES = 3;
    private final AtomicLong metricLost;
    private final AtomicLong alertLost;
    private final AtomicLong requestLost;
    private final AtomicLong segmentMetadataLost;
    private final AtomicLong invalidLost;
    private final KafkaEmitterConfig config;
    private final ObjectMapper jsonMapper;
    private final MemoryBoundLinkedBlockingQueue<String> metricQueue;
    private final MemoryBoundLinkedBlockingQueue<String> alertQueue;
    private final MemoryBoundLinkedBlockingQueue<String> requestQueue;
    private final MemoryBoundLinkedBlockingQueue<String> segmentMetadataQueue;
    private final ScheduledExecutorService scheduler;
    protected int sendInterval = DEFAULT_SEND_INTERVAL_SECONDS;
    private final Producer<String, String> producer = setKafkaProducer();

    public KafkaEmitter(KafkaEmitterConfig kafkaEmitterConfig, ObjectMapper objectMapper) {
        this.config = kafkaEmitterConfig;
        this.jsonMapper = objectMapper;
        long parseLong = Long.parseLong(this.config.getKafkaProducerConfig().getOrDefault("buffer.memory", "33554432"));
        this.metricQueue = new MemoryBoundLinkedBlockingQueue<>(parseLong);
        this.alertQueue = new MemoryBoundLinkedBlockingQueue<>(parseLong);
        this.requestQueue = new MemoryBoundLinkedBlockingQueue<>(parseLong);
        this.segmentMetadataQueue = new MemoryBoundLinkedBlockingQueue<>(parseLong);
        this.scheduler = Executors.newScheduledThreadPool(kafkaEmitterConfig.getEventTypes().size() + 1);
        this.metricLost = new AtomicLong(0L);
        this.alertLost = new AtomicLong(0L);
        this.requestLost = new AtomicLong(0L);
        this.segmentMetadataLost = new AtomicLong(0L);
        this.invalidLost = new AtomicLong(0L);
    }

    private Callback setProducerCallback(AtomicLong atomicLong) {
        return (recordMetadata, exc) -> {
            if (exc != null) {
                log.debug("Event send failed [%s]", new Object[]{exc.getMessage()});
                atomicLong.incrementAndGet();
            }
        };
    }

    @VisibleForTesting
    protected Producer<String, String> setKafkaProducer() {
        ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
        try {
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            Properties properties = new Properties();
            properties.put("bootstrap.servers", this.config.getBootstrapServers());
            properties.put("key.serializer", StringSerializer.class.getName());
            properties.put("value.serializer", StringSerializer.class.getName());
            properties.put("retries", Integer.valueOf(DEFAULT_RETRIES));
            properties.putAll(this.config.getKafkaProducerConfig());
            properties.putAll(this.config.getKafkaProducerSecrets().getConfig());
            KafkaProducer kafkaProducer = new KafkaProducer(properties);
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            return kafkaProducer;
        } catch (Throwable th) {
            Thread.currentThread().setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    public void start() {
        Set<KafkaEmitterConfig.EventType> eventTypes = this.config.getEventTypes();
        if (eventTypes.contains(KafkaEmitterConfig.EventType.METRICS)) {
            this.scheduler.schedule(this::sendMetricToKafka, this.sendInterval, TimeUnit.SECONDS);
        }
        if (eventTypes.contains(KafkaEmitterConfig.EventType.ALERTS)) {
            this.scheduler.schedule(this::sendAlertToKafka, this.sendInterval, TimeUnit.SECONDS);
        }
        if (eventTypes.contains(KafkaEmitterConfig.EventType.REQUESTS)) {
            this.scheduler.schedule(this::sendRequestToKafka, this.sendInterval, TimeUnit.SECONDS);
        }
        if (eventTypes.contains(KafkaEmitterConfig.EventType.SEGMENT_METADATA)) {
            this.scheduler.schedule(this::sendSegmentMetadataToKafka, this.sendInterval, TimeUnit.SECONDS);
        }
        this.scheduler.scheduleWithFixedDelay(() -> {
            log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], segmentMetadataLost=[%d], invalidLost=[%d]", new Object[]{Long.valueOf(this.metricLost.get()), Long.valueOf(this.alertLost.get()), Long.valueOf(this.requestLost.get()), Long.valueOf(this.segmentMetadataLost.get()), Long.valueOf(this.invalidLost.get())});
        }, 5L, 5L, TimeUnit.MINUTES);
        log.info("Starting Kafka Emitter.", new Object[0]);
    }

    private void sendMetricToKafka() {
        sendToKafka(this.config.getMetricTopic(), this.metricQueue, setProducerCallback(this.metricLost));
    }

    private void sendAlertToKafka() {
        sendToKafka(this.config.getAlertTopic(), this.alertQueue, setProducerCallback(this.alertLost));
    }

    private void sendRequestToKafka() {
        sendToKafka(this.config.getRequestTopic(), this.requestQueue, setProducerCallback(this.requestLost));
    }

    private void sendSegmentMetadataToKafka() {
        sendToKafka(this.config.getSegmentMetadataTopic(), this.segmentMetadataQueue, setProducerCallback(this.segmentMetadataLost));
    }

    private void sendToKafka(String str, MemoryBoundLinkedBlockingQueue<String> memoryBoundLinkedBlockingQueue, Callback callback) {
        while (true) {
            try {
                this.producer.send(new ProducerRecord(str, (String) memoryBoundLinkedBlockingQueue.take().getData()), callback);
            } catch (Throwable th) {
                if ((th instanceof InterruptedException) && th.getMessage() == null) {
                    log.info("Normal exit.", new Object[0]);
                    return;
                } else {
                    log.warn(th, "Exception while getting record from queue or producer send, Events would not be emitted anymore.", new Object[0]);
                    return;
                }
            }
        }
    }

    public void emit(Event event) {
        if (event != null) {
            try {
                MemoryBoundLinkedBlockingQueue.ObjectContainer objectContainer = new MemoryBoundLinkedBlockingQueue.ObjectContainer(this.jsonMapper.writeValueAsString(addExtraDimensionsToEvent(event.toMap())), StringUtils.toUtf8(r0).length);
                Set<KafkaEmitterConfig.EventType> eventTypes = this.config.getEventTypes();
                if (event instanceof ServiceMetricEvent) {
                    if (!eventTypes.contains(KafkaEmitterConfig.EventType.METRICS) || !this.metricQueue.offer(objectContainer)) {
                        this.metricLost.incrementAndGet();
                    }
                } else if (event instanceof AlertEvent) {
                    if (!eventTypes.contains(KafkaEmitterConfig.EventType.ALERTS) || !this.alertQueue.offer(objectContainer)) {
                        this.alertLost.incrementAndGet();
                    }
                } else if (event instanceof RequestLogEvent) {
                    if (!eventTypes.contains(KafkaEmitterConfig.EventType.REQUESTS) || !this.requestQueue.offer(objectContainer)) {
                        this.requestLost.incrementAndGet();
                    }
                } else if (!(event instanceof SegmentMetadataEvent)) {
                    this.invalidLost.incrementAndGet();
                } else if (!eventTypes.contains(KafkaEmitterConfig.EventType.SEGMENT_METADATA) || !this.segmentMetadataQueue.offer(objectContainer)) {
                    this.segmentMetadataLost.incrementAndGet();
                }
            } catch (JsonProcessingException e) {
                this.invalidLost.incrementAndGet();
                log.warn(e, "Exception while serializing event", new Object[0]);
            }
        }
    }

    private EventMap addExtraDimensionsToEvent(EventMap eventMap) {
        if (this.config.getClusterName() != null || this.config.getExtraDimensions() != null) {
            EventMap.Builder asBuilder = eventMap.asBuilder();
            if (this.config.getClusterName() != null) {
                asBuilder.put("clusterName", this.config.getClusterName());
            }
            if (this.config.getExtraDimensions() != null) {
                asBuilder.putAll(this.config.getExtraDimensions());
            }
            eventMap = asBuilder.build();
        }
        return eventMap;
    }

    public void flush() {
        this.producer.flush();
    }

    @LifecycleStop
    public void close() {
        this.scheduler.shutdownNow();
        this.producer.close();
    }

    public long getMetricLostCount() {
        return this.metricLost.get();
    }

    public long getAlertLostCount() {
        return this.alertLost.get();
    }

    public long getRequestLostCount() {
        return this.requestLost.get();
    }

    public long getInvalidLostCount() {
        return this.invalidLost.get();
    }

    public long getSegmentMetadataLostCount() {
        return this.segmentMetadataLost.get();
    }
}
