package com.sap.cds.services.messaging.jms;

import com.sap.cds.impl.util.Pair;
import com.sap.cds.services.messaging.service.MessagingBrokerQueueListener;
import com.sap.cds.services.messaging.utils.MessagingUtils;
import com.sap.cds.services.utils.CdsErrorStatuses;
import com.sap.cds.services.utils.ErrorStatusException;
import com.sap.cds.services.utils.StringUtils;
import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.JMSException;
import jakarta.jms.Message;
import jakarta.jms.MessageConsumer;
import jakarta.jms.Session;
import jakarta.jms.TextMessage;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/sap/cds/services/messaging/jms/MessageQueueReader.class */
public class MessageQueueReader {
    private static final Logger logger = LoggerFactory.getLogger(MessageQueueReader.class);
    private final String queueName;
    private final MessagingBrokerQueueListener listener;
    private final TopicAccessor topicAccessor;

    /* loaded from: input_file:com/sap/cds/services/messaging/jms/MessageQueueReader$MessageQueueReaderThread.class */
    private class MessageQueueReaderThread extends Thread {
        private final Connection connection;
        private Session session;
        private MessageConsumer consumer;

        public MessageQueueReaderThread(Connection connection) throws JMSException {
            super(MessageQueueReader.this.queueName + " - Listener");
            this.connection = connection;
            initSession();
        }

        private void initSession() throws JMSException {
            Session session = this.session;
            try {
                this.session = this.connection.createSession(2);
                this.consumer = this.session.createConsumer(this.session.createQueue(MessageQueueReader.this.queueName));
                if (session != null) {
                    session.close();
                }
            } catch (Throwable th) {
                if (session != null) {
                    session.close();
                }
                throw th;
            }
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    Message receive = this.consumer.receive();
                    if (receive == null) {
                        MessageQueueReader.logger.debug("The message consumer of the queue reader '{}' was closed", MessageQueueReader.this.queueName);
                        return;
                    }
                    ReceivedMessage receivedMessage = null;
                    try {
                        receivedMessage = new ReceivedMessage(receive, MessageQueueReader.this.topicAccessor);
                        MessageQueueReader.this.listener.receivedMessage(receivedMessage);
                    } catch (Throwable th) {
                        if (receivedMessage != null) {
                            Logger logger = MessageQueueReader.logger;
                            Object[] objArr = new Object[3];
                            objArr[0] = MessageQueueReader.this.queueName;
                            objArr[1] = !StringUtils.isEmpty(receivedMessage.getBrokerTopic()) ? receivedMessage.getBrokerTopic() : "???";
                            objArr[2] = th;
                            logger.error("The received message of the queue '{}' and topic '{}' could not be handled", objArr);
                        } else {
                            MessageQueueReader.logger.error("The received message of the queue '{}' could not be handled", MessageQueueReader.this.queueName, th);
                        }
                        initSession();
                    }
                } catch (JMSException e) {
                    MessageQueueReader.logger.error("The queue reader '{}' was interrupted", MessageQueueReader.this.queueName, e);
                    return;
                }
                MessageQueueReader.logger.error("The queue reader '{}' was interrupted", MessageQueueReader.this.queueName, e);
                return;
            }
        }
    }

    /* loaded from: input_file:com/sap/cds/services/messaging/jms/MessageQueueReader$ReceivedMessage.class */
    private static class ReceivedMessage implements MessagingBrokerQueueListener.MessageAccess {
        private final Message jmsMessage;
        private final String message;
        private final String topic;
        private final String id;
        private Map<String, Object> dataMap;
        private Map<String, Object> headersMap;

        public ReceivedMessage(Message message, TopicAccessor topicAccessor) throws JMSException {
            this.id = message.getJMSMessageID();
            this.jmsMessage = message;
            if (message instanceof TextMessage) {
                this.message = ((TextMessage) message).getText();
                this.topic = topicAccessor.getFromTopic(message);
            } else {
                if (!(message instanceof BytesMessage)) {
                    throw new JMSException("Unknown event message format: " + message.getClass().getName());
                }
                BytesMessage bytesMessage = (BytesMessage) message;
                byte[] bArr = new byte[(int) bytesMessage.getBodyLength()];
                bytesMessage.readBytes(bArr);
                bytesMessage.reset();
                this.message = new String(bArr, StandardCharsets.UTF_8);
                this.topic = topicAccessor.getFromTopic(message);
            }
        }

        @Override // com.sap.cds.services.messaging.service.MessagingBrokerQueueListener.MessageAccess
        public String getMessage() {
            return this.message;
        }

        @Override // com.sap.cds.services.messaging.service.MessagingBrokerQueueListener.MessageAccess
        public Map<String, Object> getDataMap() {
            if (this.dataMap == null) {
                populateMaps();
            }
            return this.dataMap;
        }

        @Override // com.sap.cds.services.messaging.service.MessagingBrokerQueueListener.MessageAccess
        public Map<String, Object> getHeadersMap() {
            if (this.headersMap == null) {
                populateMaps();
            }
            return this.headersMap;
        }

        private void populateMaps() {
            Pair<Map<String, Object>, Map<String, Object>> structuredMessage = MessagingUtils.toStructuredMessage(this.message);
            this.dataMap = (Map) structuredMessage.left;
            this.headersMap = (Map) structuredMessage.right;
        }

        @Override // com.sap.cds.services.messaging.service.MessagingBrokerQueueListener.MessageAccess
        public String getId() {
            return this.id;
        }

        @Override // com.sap.cds.services.messaging.service.MessagingBrokerQueueListener.MessageAccess
        public String getTenant() {
            return null;
        }

        @Override // com.sap.cds.services.messaging.service.MessagingBrokerQueueListener.MessageAccess
        public String getBrokerTopic() {
            return this.topic;
        }

        @Override // com.sap.cds.services.messaging.service.MessagingBrokerQueueListener.MessageAccess
        public void acknowledge() {
            try {
                this.jmsMessage.acknowledge();
            } catch (JMSException e) {
                throw new ErrorStatusException(CdsErrorStatuses.ACKNOWLEDGMENT_FAILED, new Object[]{this.topic, e});
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public MessageQueueReader(String str, MessagingBrokerQueueListener messagingBrokerQueueListener, TopicAccessor topicAccessor) {
        this.topicAccessor = topicAccessor;
        this.queueName = str;
        this.listener = messagingBrokerQueueListener;
    }

    public void startListening(Connection connection) throws JMSException {
        new MessageQueueReaderThread(connection).start();
    }
}
