package com.sap.cds.feature.messaging.kafka.service;

import com.sap.cds.feature.messaging.kafka.client.KafkaClientFactory;
import com.sap.cds.services.EventContext;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.messaging.TopicMessageEventContext;
import com.sap.cds.services.messaging.service.MessageTopic;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.StringUtils;
import com.sap.cds.services.utils.model.CdsAnnotations;
import com.sap.cloud.environment.servicebinding.api.ServiceBinding;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sap/cds/feature/messaging/kafka/service/KafkaChannelMessagingService.class */
public class KafkaChannelMessagingService extends KafkaTopicMessagingService {
    private static final Logger logger = LoggerFactory.getLogger(KafkaChannelMessagingService.class);
    public static final String EFFECTIVE_TOPIC_HEADER = "x-sap-cap-effective-topic";
    private final String defaultChannel;
    private final Set<String> subscribedChannels;
    private final Map<String, String> topicToChannel;

    KafkaChannelMessagingService(CdsProperties.Messaging.MessagingServiceConfig messagingServiceConfig, ServiceBinding serviceBinding, CdsRuntime cdsRuntime, KafkaClientFactory kafkaClientFactory) {
        super(messagingServiceConfig, serviceBinding, cdsRuntime, kafkaClientFactory);
        this.subscribedChannels = new HashSet();
        this.topicToChannel = new HashMap();
        this.defaultChannel = (String) messagingServiceConfig.getQueue().getConfig().getOrDefault("channel", "cds.default");
        preprocessModel();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaChannelMessagingService(CdsProperties.Messaging.MessagingServiceConfig messagingServiceConfig, ServiceBinding serviceBinding, CdsRuntime cdsRuntime) {
        this(messagingServiceConfig, serviceBinding, cdsRuntime, null);
    }

    private void preprocessModel() {
        this.runtime.getCdsModel().events().forEach(cdsEvent -> {
            String str = (String) CdsAnnotations.TOPIC.getOrDefault(cdsEvent);
            if (StringUtils.isEmpty(str)) {
                str = cdsEvent.getQualifiedName();
            }
            String str2 = (String) CdsAnnotations.KAFKA_TOPIC.getOrDefault(cdsEvent);
            if (StringUtils.isEmpty(str2)) {
                str2 = this.defaultChannel;
            }
            this.topicToChannel.put(toFullyQualifiedTopicName(str, false), str2);
        });
    }

    @Override // com.sap.cds.feature.messaging.kafka.service.KafkaTopicMessagingService
    public void init() {
        if (!this.queue.getTopics().isEmpty() || this.forceListening) {
            HashSet<String> hashSet = new HashSet();
            hashSet.add(this.defaultChannel);
            for (Map.Entry<String, String> entry : this.topicToChannel.entrySet()) {
                if (this.queue.hasTopic(entry.getKey())) {
                    hashSet.add(entry.getValue());
                }
            }
            for (String str : hashSet) {
                logger.info("Creating channel handler for channel topic '{}'", str);
                on(str, null, this::dispatchToHandler);
            }
        }
        super.init();
    }

    private void dispatchToHandler(EventContext eventContext) {
        String str;
        if (!Boolean.TRUE.equals(eventContext.as(TopicMessageEventContext.class).getIsInbound()) || (str = (String) eventContext.get(EFFECTIVE_TOPIC_HEADER)) == null || !this.queue.hasTopic(str) || this.subscribedChannels.contains(str)) {
            return;
        }
        for (MessageTopic messageTopic : this.queue.findTopic(str)) {
            TopicMessageEventContext create = TopicMessageEventContext.create(messageTopic.getEventName());
            eventContext.keySet().forEach(str2 -> {
                create.put(str2, eventContext.get(str2));
            });
            logger.debug("Dispatching topic '{}' received on channel '{}' to event '{}'", new Object[]{str, eventContext.getEvent(), messageTopic.getEventName()});
            emit(create);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.sap.cds.feature.messaging.kafka.service.KafkaTopicMessagingService
    public void emitTopicMessage(String str, TopicMessageEventContext topicMessageEventContext) {
        topicMessageEventContext.put(EFFECTIVE_TOPIC_HEADER, str);
        super.emitTopicMessage(this.topicToChannel.getOrDefault(str, this.defaultChannel), topicMessageEventContext);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.sap.cds.feature.messaging.kafka.service.KafkaTopicMessagingService
    public void createQueueSubscription(String str, String str2) {
        String orDefault = this.topicToChannel.getOrDefault(str2, this.defaultChannel);
        if (this.subscribedChannels.add(orDefault)) {
            super.createQueueSubscription(str, orDefault);
        }
    }
}
