package com.sap.cds.feature.messaging.kafka.service;

import com.google.common.annotations.VisibleForTesting;
import com.sap.cds.feature.messaging.kafka.client.KafkaClientFactory;
import com.sap.cds.feature.messaging.kafka.client.KafkaMessagingConsumer;
import com.sap.cds.feature.messaging.kafka.client.KafkaMessagingProducer;
import com.sap.cds.feature.messaging.kafka.utils.KafkaServiceBinding;
import com.sap.cds.feature.messaging.kafka.utils.KafkaUtils;
import com.sap.cds.impl.parser.token.Jsonizer;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.messaging.CloudEventMessageEventContext;
import com.sap.cds.services.messaging.TopicMessageEventContext;
import com.sap.cds.services.messaging.service.AbstractMessagingService;
import com.sap.cds.services.messaging.service.MessagingBrokerQueueListener;
import com.sap.cds.services.messaging.utils.CloudEventUtils;
import com.sap.cds.services.request.UserInfo;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.CdsErrorStatuses;
import com.sap.cds.services.utils.ErrorStatusException;
import com.sap.cds.services.utils.StringUtils;
import com.sap.cloud.environment.servicebinding.api.ServiceBinding;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sap/cds/feature/messaging/kafka/service/KafkaTopicMessagingService.class */
public class KafkaTopicMessagingService extends AbstractMessagingService {
    private static final String PARTITION = "partition";
    private static final String OBJECT_KEY = "objectKey";
    private final KafkaClientFactory kafkaClientFactory;
    private static final Logger logger = LoggerFactory.getLogger(KafkaTopicMessagingService.class);
    public static final String TENANT_ID_HEADER = "x-sap-cap-tenant-id";
    public static final String MESSAGE_ID_HEADER = "x-sap-cap-message-id";
    public static final String HEADERS_IN_MESSAGE_HEADER = "x-sap-cap-headers-in-message";
    private static final Set<String> INTERNAL_HEADERS = Set.of(KafkaChannelMessagingService.EFFECTIVE_TOPIC_HEADER, TENANT_ID_HEADER, MESSAGE_ID_HEADER, HEADERS_IN_MESSAGE_HEADER);

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaTopicMessagingService(CdsProperties.Messaging.MessagingServiceConfig messagingServiceConfig, ServiceBinding serviceBinding, CdsRuntime cdsRuntime, KafkaClientFactory kafkaClientFactory) {
        super(messagingServiceConfig, cdsRuntime);
        if (Objects.nonNull(kafkaClientFactory)) {
            this.kafkaClientFactory = kafkaClientFactory;
        } else {
            this.kafkaClientFactory = KafkaClientFactory.create(messagingServiceConfig.getName(), this.queue.getName(), new KafkaServiceBinding(serviceBinding), messagingServiceConfig.getQueue().getConfig(), cdsRuntime);
        }
    }

    public KafkaTopicMessagingService(CdsProperties.Messaging.MessagingServiceConfig messagingServiceConfig, ServiceBinding serviceBinding, CdsRuntime cdsRuntime) {
        this(messagingServiceConfig, serviceBinding, cdsRuntime, null);
    }

    public void init() {
        run(() -> {
            super.init();
            if (this.queue.getTopics().isEmpty() || this.runtime.getEnvironment().getCdsProperties().getEnvironment().getCommand().isEnabled().booleanValue()) {
                return;
            }
            this.kafkaClientFactory.startConsumer();
        });
    }

    public void stop() {
        try {
            this.kafkaClientFactory.closeProducer();
            if (!this.queue.getTopics().isEmpty()) {
                this.kafkaClientFactory.closeConsumer();
            }
            super.stop();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    protected void removeQueue(String str) {
    }

    protected void createQueue(String str, Map<String, Object> map) {
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public String toFullyQualifiedTopicName(String str, boolean z) {
        return str.replace('/', '_');
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createQueueSubscription(String str, String str2) {
        this.kafkaClientFactory.getOrCreateTopicAdminClient().createTopicIfNotExisting(str2);
        consumer().subscribe(str2);
    }

    protected void registerQueueListener(String str, MessagingBrokerQueueListener messagingBrokerQueueListener) {
        consumer().setMessageConsumer(messagingBrokerQueueListener);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emitTopicMessage(String str, TopicMessageEventContext topicMessageEventContext) {
        String data;
        this.kafkaClientFactory.getOrCreateTopicAdminClient().createTopicIfNotExisting(str);
        try {
            Integer retrievePartition = retrievePartition(topicMessageEventContext);
            String retrieveObjectKey = retrieveObjectKey(topicMessageEventContext);
            List<Header> buildHeaders = buildHeaders(topicMessageEventContext);
            if (topicMessageEventContext.getDataMap() != null) {
                data = CloudEventUtils.toJson(topicMessageEventContext.getDataMap());
            } else {
                data = topicMessageEventContext.getData();
                buildHeaders.add(createRecordHeader(HEADERS_IN_MESSAGE_HEADER, "true"));
            }
            producer().publish(str, retrievePartition, retrieveObjectKey, data, buildHeaders);
        } catch (InterruptedException e) {
            logger.error("Error while emitting a message for topic '{}', thread interrupted.", str, e);
            Thread.currentThread().interrupt();
        } catch (ExecutionException | TimeoutException e2) {
            throw new ErrorStatusException(CdsErrorStatuses.EVENT_EMITTING_FAILED, new Object[]{str, e2});
        }
    }

    private Integer retrievePartition(TopicMessageEventContext topicMessageEventContext) {
        return (Integer) topicMessageEventContext.get(PARTITION);
    }

    private String retrieveObjectKey(TopicMessageEventContext topicMessageEventContext) {
        String str = (String) topicMessageEventContext.get(OBJECT_KEY);
        if (str != null) {
            return str;
        }
        return (String) topicMessageEventContext.getModel().findEvent((String) topicMessageEventContext.get("cds.eventName")).filter(cdsEvent -> {
            return cdsEvent.keyElements().findAny().isPresent();
        }).map(cdsEvent2 -> {
            CloudEventMessageEventContext cloudEventMessageContext = CloudEventUtils.toCloudEventMessageContext(topicMessageEventContext, cdsEvent2.getName());
            if (cloudEventMessageContext.getData() == null) {
                return null;
            }
            Map data = cloudEventMessageContext.getData();
            return Jsonizer.json((Map) cdsEvent2.keyElements().collect(TreeMap::new, (treeMap, cdsElement) -> {
                treeMap.put(cdsElement.getName(), data.get(cdsElement.getName()));
            }, (v0, v1) -> {
                v0.putAll(v1);
            }));
        }).orElse(null);
    }

    private List<Header> buildHeaders(TopicMessageEventContext topicMessageEventContext) {
        ArrayList arrayList = new ArrayList();
        if (topicMessageEventContext.getHeadersMap() != null) {
            Stream map = topicMessageEventContext.getHeadersMap().entrySet().stream().filter(this::isNotInternalHeader).map(this::createRecordHeader);
            Objects.requireNonNull(arrayList);
            map.forEach((v1) -> {
                r1.add(v1);
            });
        }
        String str = (String) topicMessageEventContext.get(KafkaChannelMessagingService.EFFECTIVE_TOPIC_HEADER);
        if (str != null) {
            arrayList.add(createRecordHeader(KafkaChannelMessagingService.EFFECTIVE_TOPIC_HEADER, str));
        }
        UserInfo userInfo = topicMessageEventContext.getUserInfo();
        if (!StringUtils.isEmpty(userInfo.getTenant())) {
            arrayList.add(createRecordHeader(TENANT_ID_HEADER, userInfo.getTenant()));
        }
        arrayList.add(createRecordHeader(MESSAGE_ID_HEADER, UUID.randomUUID().toString()));
        return arrayList;
    }

    private boolean isNotInternalHeader(Map.Entry<String, Object> entry) {
        return !INTERNAL_HEADERS.contains(entry.getKey());
    }

    private Header createRecordHeader(Map.Entry<String, Object> entry) {
        return createRecordHeader(entry.getKey(), entry.getValue().toString());
    }

    private Header createRecordHeader(String str, String str2) {
        return new RecordHeader(str, KafkaUtils.toBytes(str2));
    }

    @VisibleForTesting
    KafkaMessagingProducer producer() {
        return this.kafkaClientFactory.getOrCreateProducer();
    }

    @VisibleForTesting
    KafkaMessagingConsumer consumer() {
        return this.kafkaClientFactory.getOrCreateConsumer();
    }

    private void run(Runnable runnable) {
        Thread thread = new Thread(runnable, getName() + " - Initializer");
        thread.setDaemon(true);
        thread.start();
    }
}
