/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.amqp.inbound;

import com.rabbitmq.client.Channel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;
import org.jspecify.annotations.Nullable;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.batch.BatchingStrategy;
import org.springframework.amqp.rabbit.batch.SimpleBatchingStrategy;
import org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.MessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareBatchMessageListener;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.retry.MessageBatchRecoverer;
import org.springframework.amqp.rabbit.retry.MessageRecoverer;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.core.AttributeAccessor;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.amqp.support.AmqpHeaderMapper;
import org.springframework.integration.amqp.support.AmqpMessageHeaderErrorMessageStrategy;
import org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper;
import org.springframework.integration.amqp.support.EndpointUtils;
import org.springframework.integration.context.OrderlyShutdownCapable;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.integration.support.ErrorMessageUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.retry.RecoveryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryOperations;
import org.springframework.retry.support.RetrySynchronizationManager;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.util.Assert;

public class AmqpInboundChannelAdapter
extends MessageProducerSupport
implements OrderlyShutdownCapable {
    public static final String CONSOLIDATED_HEADERS = "amqp_batchedHeaders";
    private static final ThreadLocal<@Nullable AttributeAccessor> ATTRIBUTES_HOLDER = new ThreadLocal();
    private final MessageListenerContainer messageListenerContainer;
    private final @Nullable AbstractMessageListenerContainer abstractListenerContainer;
    private MessageConverter messageConverter = new SimpleMessageConverter();
    private AmqpHeaderMapper headerMapper = DefaultAmqpHeaderMapper.inboundMapper();
    private @Nullable RetryTemplate retryTemplate;
    private @Nullable RecoveryCallback<?> recoveryCallback;
    private @Nullable MessageRecoverer messageRecoverer;
    private BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(0, 0, 0L);
    private boolean bindSourceMessage;
    private BatchMode batchMode = BatchMode.MESSAGES;
    private String headerNameForBatchedHeaders = "amqp_batchedHeaders";

    public AmqpInboundChannelAdapter(MessageListenerContainer listenerContainer) {
        AbstractMessageListenerContainer abstractMessageListenerContainer;
        Assert.notNull((Object)listenerContainer, (String)"listenerContainer must not be null");
        Assert.isNull((Object)listenerContainer.getMessageListener(), (String)"The listenerContainer provided to an AMQP inbound Channel Adapter must not have a MessageListener configured since the adapter configure its own listener implementation.");
        this.messageListenerContainer = listenerContainer;
        this.messageListenerContainer.setAutoStartup(false);
        this.setErrorMessageStrategy(new AmqpMessageHeaderErrorMessageStrategy());
        this.abstractListenerContainer = listenerContainer instanceof AbstractMessageListenerContainer ? (abstractMessageListenerContainer = (AbstractMessageListenerContainer)listenerContainer) : null;
    }

    public void setMessageConverter(MessageConverter messageConverter) {
        Assert.notNull((Object)messageConverter, (String)"messageConverter must not be null");
        this.messageConverter = messageConverter;
    }

    public void setHeaderMapper(AmqpHeaderMapper headerMapper) {
        Assert.notNull((Object)headerMapper, (String)"headerMapper must not be null");
        this.headerMapper = headerMapper;
    }

    public void setRetryTemplate(RetryTemplate retryTemplate) {
        this.retryTemplate = retryTemplate;
    }

    public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
        this.recoveryCallback = recoveryCallback;
    }

    public void setMessageRecoverer(MessageRecoverer messageRecoverer) {
        this.messageRecoverer = messageRecoverer;
    }

    public void setBatchingStrategy(BatchingStrategy batchingStrategy) {
        Assert.notNull((Object)batchingStrategy, (String)"'batchingStrategy' cannot be null");
        this.batchingStrategy = batchingStrategy;
    }

    public void setBindSourceMessage(boolean bindSourceMessage) {
        this.bindSourceMessage = bindSourceMessage;
    }

    public void setBatchMode(BatchMode batchMode) {
        Assert.notNull((Object)((Object)batchMode), (String)"'batchMode' cannot be null");
        this.batchMode = batchMode;
    }

    public void setHeaderNameForBatchedHeaders(String headerNameForBatchedHeaders) {
        Assert.hasText((String)headerNameForBatchedHeaders, (String)"'headerNameForBatchedHeaders' must not be empty");
        this.headerNameForBatchedHeaders = headerNameForBatchedHeaders;
    }

    public String getComponentType() {
        return "amqp:inbound-channel-adapter";
    }

    protected void onInit() {
        if (this.retryTemplate != null) {
            Assert.state((this.getErrorChannel() == null ? 1 : 0) != 0, (String)"Cannot have an 'errorChannel' property when a 'RetryTemplate' is provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to send an error message when retries are exhausted");
            this.setupRecoveryCallbackIfAny();
        }
        Listener messageListener = this.messageListenerContainer.isConsumerBatchEnabled() ? new BatchListener() : new Listener();
        this.messageListenerContainer.setupMessageListener((MessageListener)messageListener);
        this.messageListenerContainer.afterPropertiesSet();
        super.onInit();
    }

    private void setupRecoveryCallbackIfAny() {
        MessageRecoverer messageRecovererToUse = this.messageRecoverer;
        Assert.state((this.recoveryCallback == null || messageRecovererToUse == null ? 1 : 0) != 0, (String)"Only one of 'recoveryCallback' or 'messageRecoverer' may be provided, but not both");
        if (messageRecovererToUse != null) {
            if (this.messageListenerContainer.isConsumerBatchEnabled()) {
                Assert.isInstanceOf(MessageBatchRecoverer.class, (Object)messageRecovererToUse, (String)"The 'messageRecoverer' must be an instance of MessageBatchRecoverer when consumer configured for batch mode");
                this.recoveryCallback = context -> {
                    List messagesToRecover = (List)context.getAttribute("amqp_raw_message");
                    if (messagesToRecover != null) {
                        ((MessageBatchRecoverer)messageRecovererToUse).recover(messagesToRecover, context.getLastThrowable());
                    }
                    return null;
                };
            } else {
                this.recoveryCallback = context -> {
                    org.springframework.amqp.core.Message messageToRecover = (org.springframework.amqp.core.Message)context.getAttribute("amqp_raw_message");
                    if (messageToRecover != null) {
                        messageRecovererToUse.recover(messageToRecover, context.getLastThrowable());
                    }
                    return null;
                };
            }
        }
    }

    protected void doStart() {
        this.messageListenerContainer.start();
    }

    protected void doStop() {
        this.messageListenerContainer.stop();
    }

    public int beforeShutdown() {
        this.stop();
        return 0;
    }

    public int afterShutdown() {
        return 0;
    }

    private void setAttributesIfNecessary(Object amqpMessage, @Nullable Message<?> message) {
        boolean needAttributes;
        boolean needHolder = this.getErrorChannel() != null && this.retryTemplate == null;
        boolean bl = needAttributes = needHolder || this.retryTemplate != null;
        if (needHolder) {
            ATTRIBUTES_HOLDER.set(ErrorMessageUtils.getAttributeAccessor(null, null));
        }
        if (needAttributes) {
            RetryContext attributes;
            Object object = attributes = this.retryTemplate != null ? RetrySynchronizationManager.getContext() : ATTRIBUTES_HOLDER.get();
            if (attributes != null) {
                attributes.setAttribute("inputMessage", message);
                attributes.setAttribute("amqp_raw_message", amqpMessage);
            }
        }
    }

    protected AttributeAccessor getErrorMessageAttributes(@Nullable Message<?> message) {
        AttributeAccessor attributes = ATTRIBUTES_HOLDER.get();
        if (attributes == null) {
            return super.getErrorMessageAttributes(message);
        }
        return attributes;
    }

    public static enum BatchMode {
        MESSAGES,
        EXTRACT_PAYLOADS,
        EXTRACT_PAYLOADS_WITH_HEADERS;

    }

    protected class BatchListener
    extends Listener
    implements ChannelAwareBatchMessageListener {
        private final boolean batchModeMessages;

        protected BatchListener() {
            this.batchModeMessages = BatchMode.MESSAGES.equals((Object)AmqpInboundChannelAdapter.this.batchMode);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onMessageBatch(List<org.springframework.amqp.core.Message> messages, @Nullable Channel channel) {
            List<Object> converted;
            ArrayList<Map<String, Object>> headers = null;
            if (this.batchModeMessages) {
                converted = this.convertMessages(messages, channel);
            } else {
                converted = this.convertPayloads(messages, channel);
                if (BatchMode.EXTRACT_PAYLOADS_WITH_HEADERS.equals((Object)AmqpInboundChannelAdapter.this.batchMode)) {
                    ArrayList<Map<String, Object>> listHeaders = new ArrayList<Map<String, Object>>();
                    messages.forEach(msg -> listHeaders.add(AmqpInboundChannelAdapter.this.headerMapper.toHeadersFromRequest(msg.getMessageProperties())));
                    headers = listHeaders;
                }
            }
            if (converted != null) {
                Message<Object> message = this.createMessageFromPayload(converted, channel, new HashMap<String, Object>(), messages.get(messages.size() - 1).getMessageProperties().getDeliveryTag(), headers);
                try {
                    if (this.retryOps == null) {
                        AmqpInboundChannelAdapter.this.setAttributesIfNecessary(messages, message);
                        AmqpInboundChannelAdapter.this.sendMessage(message);
                    } else {
                        this.retryOps.execute(context -> {
                            AtomicInteger deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt((Message)message);
                            if (deliveryAttempt != null) {
                                deliveryAttempt.incrementAndGet();
                            }
                            if (this.batchModeMessages) {
                                List payloads = (List)message.getPayload();
                                payloads.forEach(payload -> Objects.requireNonNull(StaticMessageHeaderAccessor.getDeliveryAttempt((Message)payload)).incrementAndGet());
                            }
                            AmqpInboundChannelAdapter.this.setAttributesIfNecessary(messages, message);
                            AmqpInboundChannelAdapter.this.sendMessage(message);
                            return null;
                        }, this.recoverer);
                    }
                }
                finally {
                    if (this.retryOps == null) {
                        ATTRIBUTES_HOLDER.remove();
                    }
                }
            }
        }

        private @Nullable List<Message<?>> convertMessages(List<org.springframework.amqp.core.Message> messages, @Nullable Channel channel) {
            ArrayList converted = new ArrayList();
            try {
                messages.forEach(message -> converted.add(this.createMessageFromAmqp((org.springframework.amqp.core.Message)message, channel)));
                return converted;
            }
            catch (MessageConversionException e) {
                MessageChannel errorChannel = AmqpInboundChannelAdapter.this.getErrorChannel();
                if (errorChannel == null) {
                    throw e;
                }
                AmqpInboundChannelAdapter.this.setAttributesIfNecessary(messages, null);
                AmqpInboundChannelAdapter.this.getMessagingTemplate().send((Object)errorChannel, (Message)AmqpInboundChannelAdapter.this.buildErrorMessage(null, (Exception)EndpointUtils.errorMessagePayload(messages, channel, this.manualAcks, (Exception)((Object)e))));
                return null;
            }
        }

        private @Nullable List<?> convertPayloads(List<org.springframework.amqp.core.Message> messages, @Nullable Channel channel) {
            ArrayList converted = new ArrayList();
            try {
                messages.forEach(message -> converted.add(this.converter.fromMessage(message)));
                return converted;
            }
            catch (MessageConversionException e) {
                MessageChannel errorChannel = AmqpInboundChannelAdapter.this.getErrorChannel();
                if (errorChannel == null) {
                    throw e;
                }
                AmqpInboundChannelAdapter.this.setAttributesIfNecessary(messages, null);
                AmqpInboundChannelAdapter.this.getMessagingTemplate().send((Object)errorChannel, (Message)AmqpInboundChannelAdapter.this.buildErrorMessage(null, (Exception)EndpointUtils.errorMessagePayload(messages, channel, this.manualAcks, (Exception)((Object)e))));
                return null;
            }
        }
    }

    protected class Listener
    implements ChannelAwareMessageListener {
        protected final MessageConverter converter;
        protected final boolean manualAcks;
        protected final @Nullable RetryOperations retryOps;
        protected final @Nullable RecoveryCallback<?> recoverer;

        protected Listener() {
            this.converter = AmqpInboundChannelAdapter.this.messageConverter;
            this.manualAcks = AmqpInboundChannelAdapter.this.abstractListenerContainer != null && AcknowledgeMode.MANUAL == AmqpInboundChannelAdapter.this.abstractListenerContainer.getAcknowledgeMode();
            this.retryOps = AmqpInboundChannelAdapter.this.retryTemplate;
            this.recoverer = AmqpInboundChannelAdapter.this.recoveryCallback;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onMessage(org.springframework.amqp.core.Message message, @Nullable Channel channel) {
            block9: {
                try {
                    if (this.retryOps == null) {
                        this.createAndSend(message, channel);
                    } else {
                        Message<Object> toSend = this.createMessageFromAmqp(message, channel);
                        this.retryOps.execute(context -> {
                            AtomicInteger deliveryAttempt = StaticMessageHeaderAccessor.getDeliveryAttempt((Message)toSend);
                            if (deliveryAttempt != null) {
                                deliveryAttempt.incrementAndGet();
                            }
                            AmqpInboundChannelAdapter.this.setAttributesIfNecessary(message, toSend);
                            AmqpInboundChannelAdapter.this.sendMessage(toSend);
                            return null;
                        }, this.recoverer);
                    }
                }
                catch (MessageConversionException e) {
                    MessageChannel errorChannel = AmqpInboundChannelAdapter.this.getErrorChannel();
                    if (errorChannel != null) {
                        AmqpInboundChannelAdapter.this.setAttributesIfNecessary(message, null);
                        AmqpInboundChannelAdapter.this.getMessagingTemplate().send((Object)errorChannel, (Message)AmqpInboundChannelAdapter.this.buildErrorMessage(null, (Exception)EndpointUtils.errorMessagePayload(message, channel, this.manualAcks, (Exception)((Object)e))));
                        break block9;
                    }
                    throw e;
                }
                finally {
                    if (this.retryOps == null) {
                        ATTRIBUTES_HOLDER.remove();
                    }
                }
            }
        }

        private void createAndSend(org.springframework.amqp.core.Message message, @Nullable Channel channel) {
            Message<Object> messagingMessage = this.createMessageFromAmqp(message, channel);
            AmqpInboundChannelAdapter.this.setAttributesIfNecessary(message, messagingMessage);
            AmqpInboundChannelAdapter.this.sendMessage(messagingMessage);
        }

        /*
         * Issues handling annotations - annotations may be inaccurate
         */
        protected Message<Object> createMessageFromAmqp(org.springframework.amqp.core.Message message, @Nullable Channel channel) {
            Object payload = this.convertPayload(message);
            @Nullable Map headers = AmqpInboundChannelAdapter.this.headerMapper.toHeadersFromRequest(message.getMessageProperties());
            if (AmqpInboundChannelAdapter.this.bindSourceMessage) {
                headers.put("sourceData", message);
            }
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            return this.createMessageFromPayload(payload, channel, headers, deliveryTag, null);
        }

        protected Object convertPayload(org.springframework.amqp.core.Message message) {
            ArrayList payload;
            if (AmqpInboundChannelAdapter.this.batchingStrategy.canDebatch(message.getMessageProperties())) {
                ArrayList payloads = new ArrayList();
                AmqpInboundChannelAdapter.this.batchingStrategy.deBatch(message, fragment -> payloads.add(this.converter.fromMessage(fragment)));
                payload = payloads;
            } else {
                payload = this.converter.fromMessage(message);
            }
            return payload;
        }

        protected Message<Object> createMessageFromPayload(Object payload, @Nullable Channel channel, Map<String, @Nullable Object> headers, long deliveryTag, @Nullable List<Map<String, Object>> listHeaders) {
            if (this.manualAcks) {
                headers.put("amqp_deliveryTag", deliveryTag);
                headers.put("amqp_channel", channel);
            }
            if (AmqpInboundChannelAdapter.this.retryTemplate != null) {
                headers.put("deliveryAttempt", new AtomicInteger());
            }
            if (listHeaders != null) {
                headers.put(AmqpInboundChannelAdapter.this.headerNameForBatchedHeaders, listHeaders);
            }
            return AmqpInboundChannelAdapter.this.getMessageBuilderFactory().withPayload(payload).copyHeaders(headers).build();
        }
    }
}

