package com.sap.cds.services.utils.messaging.service;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.sap.cds.feature.config.Properties;
import com.sap.cds.feature.config.pojo.CdsProperties;
import com.sap.cds.reflect.CdsEvent;
import com.sap.cds.reflect.CdsModel;
import com.sap.cds.services.ErrorStatuses;
import com.sap.cds.services.EventContext;
import com.sap.cds.services.ServiceDelegator;
import com.sap.cds.services.handler.Handler;
import com.sap.cds.services.handler.annotations.HandlerOrder;
import com.sap.cds.services.handler.annotations.On;
import com.sap.cds.services.messaging.MessagingService;
import com.sap.cds.services.messaging.TopicMessageEventContext;
import com.sap.cds.services.outbox.OutboxService;
import com.sap.cds.services.utils.ErrorStatusException;
import com.sap.cds.services.utils.OrderConstants;
import com.sap.cds.services.utils.StringUtils;
import java.io.IOException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sap/cds/services/utils/messaging/service/AbstractMessagingService.class */
public abstract class AbstractMessagingService extends ServiceDelegator implements MessagingService {
    private static final Logger logger = LoggerFactory.getLogger(AbstractMessagingService.class);
    private static final ObjectMapper mapper = new ObjectMapper();
    private static final String IS_OUTBOXED = "IS_OUTBOXED";
    private final MessageQueue queue;
    private final boolean outboxed;
    private final CdsModel model;

    protected AbstractMessagingService(CdsProperties.Messaging.MessagingServiceConfig messagingServiceConfig, CdsModel cdsModel) {
        super(messagingServiceConfig.getName());
        this.model = cdsModel;
        this.queue = MessageQueue.create(messagingServiceConfig);
        this.outboxed = messagingServiceConfig.getOutbox().isEnabled().booleanValue();
    }

    public void init() {
        logger.info("Initializing subscriptions of messaging service '{}'", getName());
        String fullyQualifiedQueueName = toFullyQualifiedQueueName(this.queue);
        try {
            if (Properties.getCds().getMessaging().isResetQueues()) {
                try {
                    createQueue(fullyQualifiedQueueName, this.queue.getProperties());
                    removeQueue(fullyQualifiedQueueName);
                    logger.warn("Reset the queue '{}' of service '{}'", fullyQualifiedQueueName, getName());
                } catch (IOException e) {
                    logger.warn("Failed to reset queue '{}' of service '{}'", fullyQualifiedQueueName, getName());
                }
            }
            if (this.queue.getTopics().isEmpty()) {
                logger.warn("There are no queue subscriptions available for the service '{}'", getName());
            } else {
                createQueue(fullyQualifiedQueueName, this.queue.getProperties());
                logger.info("Created queue '{}' for service '{}'", fullyQualifiedQueueName, getName());
                Iterator<MessageTopic> it = this.queue.getTopics().iterator();
                while (it.hasNext()) {
                    String brokerName = it.next().getBrokerName();
                    try {
                        createQueueSubscription(fullyQualifiedQueueName, brokerName);
                        logger.info("Subscribed topic '{}' on queue '{}' for service '{}'", new Object[]{brokerName, fullyQualifiedQueueName, getName()});
                    } catch (IOException e2) {
                        logger.error("Failed to subscribe topic '{}' on queue '{}' for service '{}'", new Object[]{brokerName, fullyQualifiedQueueName, getName(), e2});
                    }
                }
                try {
                    registerQueueListener(fullyQualifiedQueueName, new MessagingBrokerQueueListener(this, fullyQualifiedQueueName, this.queue));
                } catch (IOException | IllegalArgumentException e3) {
                    logger.error("Failed to register the listener to the queue '{}' for service '{}'", new Object[]{fullyQualifiedQueueName, getName(), e3});
                }
            }
            logger.debug("Finished initializing subscriptions of service '{}'", getName());
        } catch (Exception e4) {
            logger.error("Failed to create queue '{}' for service '{}'", new Object[]{fullyQualifiedQueueName, getName(), e4});
        }
    }

    public void emit(EventContext eventContext) {
        if (!this.outboxed || eventContext.get(IS_OUTBOXED) != null) {
            super.emit(eventContext);
            return;
        }
        OutboxService service = eventContext.getServiceCatalog().getService(OutboxService.class, "OutboxService$Default");
        eventContext.put(IS_OUTBOXED, true);
        service.enroll(this, eventContext);
    }

    public void emit(String str, String str2) {
        TopicMessageEventContext create = TopicMessageEventContext.create(str);
        create.setData(str2);
        emit(create);
    }

    public void emit(String str, Map<String, Object> map) {
        emit(str, toJson(map));
    }

    @HandlerOrder(OrderConstants.On.AUTO_COMPLETE)
    @On
    private void autoComplete(EventContext eventContext) {
        if (this.queue.hasEvent(eventContext.getEvent())) {
            eventContext.setCompleted();
        }
    }

    @HandlerOrder(OrderConstants.On.MESSAGING)
    @On
    protected void sendMessageEvent(TopicMessageEventContext topicMessageEventContext) {
        if (Boolean.TRUE.equals(topicMessageEventContext.getIsInbound())) {
            return;
        }
        topicMessageEventContext.getService().emitTopicMessage(toFullyQualifiedTopicName((String) topicMessageEventContext.getModel().findEvent(topicMessageEventContext.getEvent()).map(this::toTopicName).orElse(topicMessageEventContext.getEvent())), topicMessageEventContext.getData());
        topicMessageEventContext.setCompleted();
    }

    private String toJson(Object obj) {
        try {
            return mapper.writeValueAsString(obj);
        } catch (JsonProcessingException e) {
            throw new ErrorStatusException(ErrorStatuses.SERVER_ERROR, e);
        }
    }

    public void on(String[] strArr, String[] strArr2, int i, Handler handler) {
        super.on(strArr, strArr2, i, handler);
        checkEvents(strArr);
    }

    private void checkEvents(String[] strArr) {
        Arrays.stream(strArr).filter(str -> {
            return (StringUtils.isEmpty(str) || str.equals("*")) ? false : true;
        }).forEach(str2 -> {
            this.queue.addTopic(new MessageTopic(str2, toFullyQualifiedTopicName((String) this.model.findEvent(str2).map(this::toTopicName).orElse(str2))));
        });
    }

    protected String toTopicName(CdsEvent cdsEvent) {
        return cdsEvent.getQualifiedName().replace('.', '/');
    }

    protected String toFullyQualifiedQueueName(MessageQueue messageQueue) {
        return messageQueue.getName();
    }

    protected String toFullyQualifiedTopicName(String str) {
        return str;
    }

    protected abstract void removeQueue(String str) throws IOException;

    protected abstract void createQueue(String str, Map<String, String> map) throws IOException;

    protected abstract void createQueueSubscription(String str, String str2) throws IOException;

    protected abstract void registerQueueListener(String str, MessagingBrokerQueueListener messagingBrokerQueueListener) throws IOException;

    protected abstract void emitTopicMessage(String str, String str2);
}
