package com.sap.cds.services.messaging.service;

import com.sap.cds.services.EventContext;
import com.sap.cds.services.ServiceDelegator;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.handler.Handler;
import com.sap.cds.services.handler.annotations.Before;
import com.sap.cds.services.handler.annotations.HandlerOrder;
import com.sap.cds.services.handler.annotations.On;
import com.sap.cds.services.messaging.MessagingErrorEventContext;
import com.sap.cds.services.messaging.MessagingService;
import com.sap.cds.services.messaging.TopicMessageEventContext;
import com.sap.cds.services.messaging.utils.CloudEventUtils;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.CdsErrorStatuses;
import com.sap.cds.services.utils.CorrelationIdUtils;
import com.sap.cds.services.utils.ErrorStatusException;
import com.sap.cds.services.utils.StringUtils;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiPredicate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sap/cds/services/messaging/service/AbstractMessagingService.class */
public abstract class AbstractMessagingService extends ServiceDelegator implements MessagingService {
    private static final Logger logger = LoggerFactory.getLogger(AbstractMessagingService.class);
    protected static final String FORMAT_CLOUDEVENTS = "cloudevents";
    public static final String CONTEXT_PARAMETERS_KEY = "cds.context.parameters";
    protected final CdsProperties.Messaging.MessagingServiceConfig serviceConfig;
    protected final CdsRuntime runtime;
    protected final MessageQueue queue;
    protected final boolean forceListening;
    private final boolean isStructured;

    protected AbstractMessagingService(CdsProperties.Messaging.MessagingServiceConfig messagingServiceConfig, CdsRuntime cdsRuntime) {
        super(messagingServiceConfig.getName());
        this.serviceConfig = messagingServiceConfig;
        this.runtime = cdsRuntime;
        this.forceListening = messagingServiceConfig.getQueue().isForceListening();
        this.isStructured = messagingServiceConfig.isStructured();
        this.queue = MessageQueue.create(messagingServiceConfig, getTopicMatcher(), cdsRuntime.getEnvironment().getApplicationInfo());
    }

    public void init() {
        if (this.runtime.getEnvironment().getCdsProperties().getEnvironment().getCommand().isEnabled().booleanValue()) {
            return;
        }
        createOrUpdateQueuesAndSubscriptions();
    }

    public void stop() {
    }

    protected boolean createOrUpdateQueuesAndSubscriptions() {
        logger.info("Initializing subscriptions of messaging service '{}'", getName());
        String fullyQualifiedQueueName = toFullyQualifiedQueueName(this.queue);
        try {
            try {
                if (this.runtime.getEnvironment().getCdsProperties().getMessaging().isResetQueues()) {
                    try {
                        createQueue(fullyQualifiedQueueName, this.queue.getProperties());
                        removeQueue(fullyQualifiedQueueName);
                        logger.warn("Reset the queue '{}' of service '{}'", fullyQualifiedQueueName, getName());
                    } catch (IOException e) {
                        logger.warn("Failed to reset queue '{}' of service '{}'", fullyQualifiedQueueName, getName());
                    }
                }
                if (this.queue.getTopics().isEmpty() && !this.forceListening) {
                    logger.warn("There are no queue subscriptions available for the service '{}'", getName());
                    logger.debug("Finished initializing subscriptions of service '{}'", getName());
                    return false;
                }
                createQueue(fullyQualifiedQueueName, this.queue.getProperties());
                logger.info("Created queue '{}' for service '{}'", fullyQualifiedQueueName, getName());
                Iterator<MessageTopic> it = this.queue.getTopics().iterator();
                while (it.hasNext()) {
                    String brokerName = it.next().getBrokerName();
                    try {
                        createQueueSubscription(fullyQualifiedQueueName, brokerName);
                        logger.info("Subscribed topic '{}' on queue '{}' for service '{}'", new Object[]{brokerName, fullyQualifiedQueueName, getName()});
                    } catch (IOException e2) {
                        logger.error("Failed to subscribe topic '{}' on queue '{}' for service '{}'", new Object[]{brokerName, fullyQualifiedQueueName, getName(), e2});
                    }
                }
                try {
                    registerQueueListener(fullyQualifiedQueueName, new MessagingBrokerQueueListener(this, fullyQualifiedQueueName, this.queue, this.runtime, this.isStructured));
                } catch (IOException | IllegalArgumentException e3) {
                    logger.error("Failed to register the listener to the queue '{}' for service '{}'", new Object[]{fullyQualifiedQueueName, getName(), e3});
                }
                logger.debug("Finished initializing subscriptions of service '{}'", getName());
                return true;
            } catch (Throwable th) {
                logger.debug("Finished initializing subscriptions of service '{}'", getName());
                throw th;
            }
        } catch (Exception e4) {
            logger.error("Failed to create queue '{}' for service '{}'", new Object[]{fullyQualifiedQueueName, getName(), e4});
            logger.debug("Finished initializing subscriptions of service '{}'", getName());
            return false;
        }
    }

    protected boolean isCloudEventsFormat() {
        return this.serviceConfig.getFormat() != null && this.serviceConfig.getFormat().trim().equalsIgnoreCase(FORMAT_CLOUDEVENTS);
    }

    public void emit(String str, String str2) {
        emit(str, str2, null, null);
    }

    public void emit(String str, Map<String, Object> map) {
        emit(str, null, map, null);
    }

    public void emit(String str, Map<String, Object> map, Map<String, Object> map2) {
        emit(str, null, map, map2);
    }

    private void emit(String str, String str2, Map<String, Object> map, Map<String, Object> map2) {
        Map<String, Object> map3;
        TopicMessageEventContext create = TopicMessageEventContext.create(str);
        retrieveContextParameters(map2).ifPresent(map4 -> {
            Objects.requireNonNull(create);
            map4.forEach(create::put);
        });
        if (this.isStructured) {
            if (str2 != null) {
                create.setDataMap(new HashMap(Map.of("message", str2)));
                create.setHeadersMap(new HashMap());
            } else {
                create.setDataMap(map);
                create.setHeadersMap(map2 != null ? map2 : new HashMap<>());
            }
        } else if (str2 != null) {
            create.setData(str2);
        } else {
            if (map2 != null) {
                map3 = new HashMap(map2);
                map3.put(CloudEventUtils.KEY_DATA, new HashMap(map));
            } else {
                map3 = map;
            }
            create.setData(CloudEventUtils.toJson(map3));
        }
        emit(create);
    }

    private Optional<Map<String, Object>> retrieveContextParameters(Map<String, Object> map) {
        return map != null ? Optional.ofNullable((Map) map.remove(CONTEXT_PARAMETERS_KEY)) : Optional.empty();
    }

    @HandlerOrder(2147483646)
    @On
    private void autoComplete(EventContext eventContext) {
        if (this.queue.hasEvent(eventContext.getEvent()) || this.forceListening) {
            eventContext.setCompleted();
        }
    }

    @HandlerOrder(Integer.MIN_VALUE)
    @Before
    protected void validateEventContext(TopicMessageEventContext topicMessageEventContext) {
        if (Boolean.TRUE.equals(topicMessageEventContext.getIsInbound())) {
            return;
        }
        if (topicMessageEventContext.getData() == null && topicMessageEventContext.getDataMap() == null && topicMessageEventContext.getHeadersMap() == null) {
            throw new ErrorStatusException(CdsErrorStatuses.NO_MESSAGE_PROVIDED, new Object[0]);
        }
        if (topicMessageEventContext.getDataMap() == null && topicMessageEventContext.getHeadersMap() != null) {
            topicMessageEventContext.setDataMap(new HashMap());
        } else if (topicMessageEventContext.getDataMap() != null && topicMessageEventContext.getHeadersMap() == null) {
            topicMessageEventContext.setHeadersMap(new HashMap());
        }
        if (topicMessageEventContext.getHeadersMap() == null || !CorrelationIdUtils.mdcHasEntry()) {
            return;
        }
        topicMessageEventContext.getHeadersMap().put("correlation_id", CorrelationIdUtils.getFromMDC());
    }

    @HandlerOrder(11000)
    @Before
    protected void cloudEventsFormatter(TopicMessageEventContext topicMessageEventContext) {
        if (Boolean.TRUE.equals(topicMessageEventContext.getIsInbound()) || !isCloudEventsFormat()) {
            return;
        }
        if (topicMessageEventContext.getHeadersMap() != null) {
            topicMessageEventContext.setHeadersMap(CloudEventUtils.toCloudEvent((Map<String, Object>) topicMessageEventContext.getHeadersMap(), topicMessageEventContext.getEvent(), this.serviceConfig.getPublishPrefix()));
        } else {
            topicMessageEventContext.setData(CloudEventUtils.toCloudEvent(topicMessageEventContext.getData(), topicMessageEventContext.getEvent(), this.serviceConfig.getPublishPrefix()));
        }
    }

    @HandlerOrder(-9900)
    @On
    protected void sendMessageEvent(TopicMessageEventContext topicMessageEventContext) {
        if (Boolean.TRUE.equals(topicMessageEventContext.getIsInbound())) {
            return;
        }
        AbstractMessagingService service = topicMessageEventContext.getService();
        String fullyQualifiedTopicName = toFullyQualifiedTopicName(topicMessageEventContext.getEvent(), false);
        logger.debug("The service event '{}' is going to be emitted on service '{}' to topic '{}'", new Object[]{topicMessageEventContext.getEvent(), getName(), fullyQualifiedTopicName});
        service.emitTopicMessage(fullyQualifiedTopicName, topicMessageEventContext);
        topicMessageEventContext.setCompleted();
    }

    @HandlerOrder(11000)
    @On
    protected void defaultErrorHandler(MessagingErrorEventContext messagingErrorEventContext) {
        if (!messagingErrorEventContext.getException().getErrorStatus().equals(CdsErrorStatuses.TENANT_NOT_EXISTS)) {
            messagingErrorEventContext.setResult(false);
        } else {
            logger.debug("Message of none-existing tenant '{}' acknowledged by default error handler.", messagingErrorEventContext.getTenant());
            messagingErrorEventContext.setResult(true);
        }
    }

    public void on(String[] strArr, String[] strArr2, int i, Handler handler) {
        super.on(strArr, strArr2, i, handler);
        Arrays.stream(strArr).filter(str -> {
            return (StringUtils.isEmpty(str) || str.equals("*") || "MESSAGING_ERROR".equals(str)) ? false : true;
        }).forEach(str2 -> {
            this.queue.addTopic(new MessageTopic(str2, toFullyQualifiedTopicName(str2, true)));
        });
    }

    protected String toFullyQualifiedQueueName(MessageQueue messageQueue) {
        return messageQueue.getName();
    }

    protected String toFullyQualifiedTopicName(String str, boolean z) {
        if (z) {
            if (this.serviceConfig.getSubscribePrefix() != null) {
                return this.serviceConfig.getSubscribePrefix() + str;
            }
        } else if (this.serviceConfig.getPublishPrefix() != null) {
            return this.serviceConfig.getPublishPrefix() + str;
        }
        return str;
    }

    protected BiPredicate<MessageTopic, String> getTopicMatcher() {
        return (messageTopic, str) -> {
            return Objects.equals(messageTopic.getBrokerName(), str);
        };
    }

    protected abstract void removeQueue(String str) throws IOException;

    protected abstract void createQueue(String str, Map<String, Object> map) throws IOException;

    protected abstract void createQueueSubscription(String str, String str2) throws IOException;

    protected abstract void registerQueueListener(String str, MessagingBrokerQueueListener messagingBrokerQueueListener) throws IOException;

    protected abstract void emitTopicMessage(String str, TopicMessageEventContext topicMessageEventContext);
}
