package com.sap.cds.feature.messaging.kafka.client;

import com.google.common.annotations.VisibleForTesting;
import com.sap.cds.feature.messaging.kafka.service.KafkaChannelMessagingService;
import com.sap.cds.feature.messaging.kafka.service.KafkaTopicMessagingService;
import com.sap.cds.feature.messaging.kafka.utils.KafkaServiceBinding;
import com.sap.cds.feature.messaging.kafka.utils.KafkaUtils;
import com.sap.cds.services.messaging.service.MessagingBrokerQueueListener;
import com.sap.cds.services.messaging.utils.CloudEventUtils;
import com.sap.cds.services.runtime.CdsRuntime;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sap/cds/feature/messaging/kafka/client/KafkaMessagingConsumer.class */
public class KafkaMessagingConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaMessagingConsumer.class);

    @VisibleForTesting
    protected CdsRuntime runtime;
    private final KafkaConsumer<String, String> consumer;
    private final String name;
    private final AtomicBoolean isRunning = new AtomicBoolean(false);
    private volatile CountDownLatch consumerPaused;
    private MessagingBrokerQueueListener messageConsumer;

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:com/sap/cds/feature/messaging/kafka/client/KafkaMessagingConsumer$ReceivedMessage.class */
    public static class ReceivedMessage implements MessagingBrokerQueueListener.MessageAccess {
        private final KafkaConsumer<String, String> consumer;
        private final String recordMessage;
        private final String topic;
        private final TopicPartition partition;
        private final long offset;
        private final Map<String, String> technicalHeaders = new HashMap();
        private final Map<String, String> recordHeaders = new HashMap();
        private String message;
        private Map<String, Object> headersMap;
        private Map<String, Object> dataMap;
        private String id;
        private String tenantId;
        private boolean headersInMessage;
        boolean acknowledged;

        ReceivedMessage(ConsumerRecord<String, String> consumerRecord, KafkaConsumer<String, String> kafkaConsumer) {
            this.consumer = kafkaConsumer;
            this.recordMessage = (String) consumerRecord.value();
            this.topic = consumerRecord.topic();
            this.partition = new TopicPartition(this.topic, consumerRecord.partition());
            this.offset = consumerRecord.offset() + 1;
            consumerRecord.headers().forEach(header -> {
                if (Objects.equals(header.key(), KafkaTopicMessagingService.MESSAGE_ID_HEADER)) {
                    this.id = KafkaUtils.toString(header.value());
                    return;
                }
                if (Objects.equals(header.key(), KafkaTopicMessagingService.TENANT_ID_HEADER)) {
                    this.tenantId = KafkaUtils.toString(header.value());
                    return;
                }
                if (Objects.equals(header.key(), KafkaChannelMessagingService.EFFECTIVE_TOPIC_HEADER)) {
                    this.technicalHeaders.put(header.key(), KafkaUtils.toString(header.value()));
                } else if (Objects.equals(header.key(), KafkaTopicMessagingService.HEADERS_IN_MESSAGE_HEADER)) {
                    this.headersInMessage = true;
                } else {
                    this.recordHeaders.put(header.key(), KafkaUtils.toString(header.value()));
                }
            });
        }

        public String getId() {
            return this.id;
        }

        public String getMessage() {
            if (this.message == null) {
                Map map = CloudEventUtils.toMap(this.recordMessage);
                if (this.headersInMessage || map == null) {
                    this.message = this.recordMessage;
                } else {
                    HashMap hashMap = new HashMap(this.recordHeaders);
                    hashMap.put("data", map);
                    this.message = CloudEventUtils.toJson(hashMap);
                }
            }
            return this.message;
        }

        public Map<String, Object> getDataMap() {
            if (this.dataMap == null) {
                populateMaps();
            }
            return this.dataMap;
        }

        public Map<String, Object> getHeadersMap() {
            if (this.headersMap == null) {
                populateMaps();
            }
            return this.headersMap;
        }

        private void populateMaps() {
            this.headersMap = new HashMap(this.recordHeaders);
            Map<String, Object> map = CloudEventUtils.toMap(this.recordMessage);
            if (map == null) {
                this.dataMap = new HashMap();
                this.dataMap.put("message", this.recordMessage);
            } else if (!this.headersInMessage || !(map.get("data") instanceof Map)) {
                this.dataMap = map;
            } else {
                this.dataMap = (Map) map.remove("data");
                this.headersMap.putAll(map);
            }
        }

        public String getBrokerTopic() {
            return this.topic;
        }

        public void acknowledge() {
            this.acknowledged = true;
        }

        boolean isAcknowledged() {
            return this.acknowledged;
        }

        void commitOffset() {
            this.consumer.commitSync(Collections.singletonMap(this.partition, new OffsetAndMetadata(this.offset)));
        }

        void rollbackOffset() {
            this.consumer.seek(this.partition, this.offset - 1);
        }

        public Map<String, String> getTechnicalHeaders() {
            return this.technicalHeaders;
        }

        public String getTenantId() {
            return this.tenantId;
        }
    }

    public KafkaMessagingConsumer(String str, String str2, KafkaServiceBinding kafkaServiceBinding, Map<String, Object> map, CdsRuntime cdsRuntime) {
        this.runtime = cdsRuntime;
        this.name = str;
        this.consumer = createKafkaConsumer(kafkaServiceBinding, str2, map);
    }

    @VisibleForTesting
    public KafkaConsumer<String, String> getConsumer() {
        return this.consumer;
    }

    public void setMessageConsumer(MessagingBrokerQueueListener messagingBrokerQueueListener) {
        this.messageConsumer = messagingBrokerQueueListener;
    }

    public void subscribe(String str) {
        HashSet hashSet = new HashSet(this.consumer.subscription());
        hashSet.add(str);
        this.consumer.subscribe(hashSet);
    }

    public void start() {
        if (this.isRunning.get()) {
            return;
        }
        this.isRunning.set(true);
        this.consumerPaused = new CountDownLatch(1);
        runConsumer();
    }

    public void close() throws InterruptedException {
        pause();
        this.consumer.close();
    }

    private void pause() throws InterruptedException {
        this.isRunning.set(false);
        synchronized (this.consumerPaused) {
            boolean z = false;
            while (!z) {
                z = this.consumerPaused.await(100L, TimeUnit.MILLISECONDS);
            }
        }
    }

    private void runConsumer() {
        Thread thread = new Thread(() -> {
            while (this.isRunning.get()) {
                try {
                    ConsumerRecords poll = this.consumer.poll(Duration.of(3L, ChronoUnit.SECONDS));
                    if (!poll.isEmpty()) {
                        poll.forEach(consumerRecord -> {
                            dispatchConsumerRecord(consumerRecord, this.consumer);
                        });
                    }
                } catch (Exception e) {
                    logger.error("Error while consuming record from Kafka.", e);
                }
            }
            this.consumerPaused.countDown();
        }, "kafka-consumer-" + this.name);
        thread.setDaemon(true);
        thread.start();
    }

    @VisibleForTesting
    protected void wait(int i) {
        try {
            Thread.sleep(i * 1000);
        } catch (InterruptedException e) {
        }
    }

    @VisibleForTesting
    void dispatchConsumerRecord(ConsumerRecord<String, String> consumerRecord, KafkaConsumer<String, String> kafkaConsumer) {
        ReceivedMessage receivedMessage = new ReceivedMessage(consumerRecord, kafkaConsumer);
        try {
            this.runtime.requestContext().systemUser(receivedMessage.getTenantId()).run(requestContext -> {
                this.messageConsumer.receivedMessage(receivedMessage);
            });
            if (receivedMessage.isAcknowledged()) {
                receivedMessage.commitOffset();
            } else {
                receivedMessage.rollbackOffset();
                wait(3);
            }
        } catch (Throwable th) {
            if (receivedMessage.isAcknowledged()) {
                receivedMessage.commitOffset();
            } else {
                receivedMessage.rollbackOffset();
                wait(3);
            }
            throw th;
        }
    }

    @VisibleForTesting
    KafkaConsumer<String, String> createKafkaConsumer(KafkaServiceBinding kafkaServiceBinding, String str, Map<String, Object> map) {
        return new KafkaConsumer<>(KafkaUtils.createMessageConsumerProperties(kafkaServiceBinding, str, map));
    }
}
