package com.sap.cds.services.messaging.service;

import com.sap.cds.services.ServiceException;
import com.sap.cds.services.messaging.MessagingErrorEventContext;
import com.sap.cds.services.messaging.MessagingService;
import com.sap.cds.services.messaging.TopicMessageEventContext;
import com.sap.cds.services.messaging.utils.CloudEventUtils;
import com.sap.cds.services.messaging.utils.MessagingUtils;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.CdsErrorStatuses;
import com.sap.cds.services.utils.ErrorStatusException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sap/cds/services/messaging/service/MessagingBrokerQueueListener.class */
public class MessagingBrokerQueueListener {
    private static final Logger logger = LoggerFactory.getLogger(MessagingBrokerQueueListener.class);
    private final MessagingService service;
    private final String queueName;
    private final MessageQueue queue;
    private final CdsRuntime runtime;
    private final boolean isStructured;

    /* loaded from: input_file:com/sap/cds/services/messaging/service/MessagingBrokerQueueListener$MessageAccess.class */
    public interface MessageAccess {
        String getId();

        String getTenant();

        String getMessage();

        String getBrokerTopic();

        void acknowledge();

        Map<String, Object> getDataMap();

        Map<String, Object> getHeadersMap();

        default Map<String, String> getTechnicalHeaders() {
            return Collections.emptyMap();
        }
    }

    public MessagingBrokerQueueListener(MessagingService messagingService, String str, MessageQueue messageQueue, CdsRuntime cdsRuntime, boolean z) {
        this.service = messagingService;
        this.queueName = str;
        this.queue = messageQueue;
        this.runtime = cdsRuntime;
        this.isStructured = z;
    }

    public String getQueueName() {
        return this.queueName;
    }

    public void receivedMessage(MessageAccess messageAccess) {
        logger.debug("Received message on service '{}' from topic '{}' and queue '{}'.", new Object[]{this.service.getName(), messageAccess.getBrokerTopic(), this.queueName});
        List<MessageTopic> findTopic = this.queue.findTopic(messageAccess.getBrokerTopic());
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        try {
            for (MessageTopic messageTopic : findTopic) {
                try {
                    this.runtime.requestContext().privilegedUser().modifyUser(modifiableUserInfo -> {
                        modifiableUserInfo.setTenant(messageAccess.getTenant());
                    }).modifyParameters(modifiableParameterInfo -> {
                        if (messageAccess.getHeadersMap() == null || !messageAccess.getHeadersMap().containsKey("correlation_id")) {
                            return;
                        }
                        modifiableParameterInfo.setCorrelationId((String) messageAccess.getHeadersMap().get("correlation_id"));
                    }).run(requestContext -> {
                        try {
                            TopicMessageEventContext context = getContext(messageAccess, messageTopic.getEventName());
                            logger.debug("The message 'id:{}' from topic '{}' on service '{}' is going to be emitted as a service event '{}'", new Object[]{context.getMessageId(), messageTopic.getBrokerName(), this.service.getName(), messageTopic.getEventName()});
                            this.service.emit(context);
                        } catch (Throwable th) {
                            atomicBoolean2.set(true);
                            performErrorHandling(th, atomicBoolean, messageAccess);
                            throw th;
                        }
                    });
                } catch (Throwable th) {
                    th = th;
                    if (!atomicBoolean2.get()) {
                        logger.debug("The tenant request context for '{}' cannot be created", messageAccess.getTenant());
                        if (MessagingUtils.isUnknownTenant(th)) {
                            th = new ErrorStatusException(CdsErrorStatuses.TENANT_NOT_EXISTS, new Object[]{messageAccess.getTenant(), th});
                        }
                        performErrorHandling(th, atomicBoolean, messageAccess);
                    }
                    throw new ErrorStatusException(CdsErrorStatuses.EVENT_PROCESSING_FAILED, new Object[]{messageTopic.getEventName(), this.service.getName(), this.queueName, th});
                }
            }
        } finally {
            if (atomicBoolean.get()) {
                messageAccess.acknowledge();
            }
        }
    }

    private void performErrorHandling(Throwable th, AtomicBoolean atomicBoolean, MessageAccess messageAccess) {
        MessagingErrorEventContext create = MessagingErrorEventContext.create();
        create.setException(th instanceof ServiceException ? (ServiceException) th : new ServiceException(th));
        create.setTenant(messageAccess.getTenant());
        create.setMessageHeaders(messageAccess.getHeadersMap());
        create.setMessageData(messageAccess.getDataMap());
        this.service.emit(create);
        if (create.getResult()) {
            return;
        }
        atomicBoolean.set(false);
    }

    private TopicMessageEventContext getContext(MessageAccess messageAccess, String str) {
        TopicMessageEventContext create = TopicMessageEventContext.create(str);
        for (Map.Entry<String, String> entry : messageAccess.getTechnicalHeaders().entrySet()) {
            create.put(entry.getKey(), entry.getValue());
        }
        if (this.isStructured) {
            create.setDataMap(messageAccess.getDataMap());
            create.setHeadersMap(messageAccess.getHeadersMap());
        } else {
            create.setData(messageAccess.getMessage());
        }
        if (messageAccess.getId() == null) {
            String str2 = null;
            if (this.isStructured && messageAccess.getHeadersMap().containsKey(CloudEventUtils.KEY_ID)) {
                str2 = (String) messageAccess.getHeadersMap().get(CloudEventUtils.KEY_ID);
            }
            create.setMessageId(str2);
        } else {
            create.setMessageId(messageAccess.getId());
        }
        create.setIsInbound(true);
        return create;
    }
}
