/*
 * Decompiled with CFR 0.152.
 */
package org.iris_events.consumer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Recoverable;
import com.rabbitmq.client.RecoveryListener;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.iris_events.annotations.ExchangeType;
import org.iris_events.consumer.DeliverCallbackProvider;
import org.iris_events.consumer.ExchangeDeclarator;
import org.iris_events.consumer.QueueDeclarator;
import org.iris_events.context.IrisContext;
import org.iris_events.runtime.ExchangeNameProvider;
import org.iris_events.runtime.QueueNameProvider;
import org.iris_events.runtime.channel.ChannelService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Consumer
implements RecoveryListener {
    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    private static final String RPC_EXCHANGE_SUFFIX = "rpc";
    private final IrisContext context;
    private final ChannelService channelService;
    private final DeliverCallbackProvider deliverCallbackProvider;
    private final QueueNameProvider queueNameProvider;
    private final ExchangeNameProvider exchangeNameProvider;
    private final QueueDeclarator queueDeclarator;
    private final ExchangeDeclarator exchangeDeclarator;
    private DeliverCallback callback;
    private String channelId;

    public Consumer(IrisContext context, ChannelService channelService, DeliverCallbackProvider deliverCallbackProvider, QueueNameProvider queueNameProvider, ExchangeNameProvider exchangeNameProvider, QueueDeclarator queueDeclarator, ExchangeDeclarator exchangeDeclarator) {
        this.context = context;
        this.channelService = channelService;
        this.deliverCallbackProvider = deliverCallbackProvider;
        this.queueNameProvider = queueNameProvider;
        this.exchangeNameProvider = exchangeNameProvider;
        this.queueDeclarator = queueDeclarator;
        this.exchangeDeclarator = exchangeDeclarator;
        this.channelId = UUID.randomUUID().toString();
    }

    public void initChannel() throws IOException {
        Channel channel = this.channelService.getOrCreateChannelById(this.channelId);
        this.callback = this.deliverCallbackProvider.createDeliverCallback(channel);
        ExchangeType exchangeType = this.context.exchangeType();
        Consumer.validateBindingKeys(this.context.getBindingKeys(), exchangeType);
        this.declareTopology(channel, exchangeType);
        if (channel instanceof Recoverable) {
            ((Recoverable)channel).addRecoveryListener((RecoveryListener)this);
        }
    }

    public DeliverCallback getCallback() {
        return this.callback;
    }

    protected IrisContext getContext() {
        return this.context;
    }

    private void declareTopology(Channel channel, ExchangeType exchangeType) throws IOException {
        HashMap<String, Object> queueDeclarationArgs = new HashMap<String, Object>();
        String exchange = this.getExchangeName();
        String queueName = this.queueNameProvider.getQueueName(this.context);
        int prefetchCount = this.context.getPrefetch();
        channel.basicQos(prefetchCount);
        long ttl = this.context.getTtl();
        if (ttl >= 0L) {
            queueDeclarationArgs.put("x-message-ttl", ttl);
        }
        if (this.context.isRpc()) {
            int rpcTtl = 2000;
            queueDeclarationArgs.put("x-message-ttl", 2000);
            String rpcRequestExchange = this.exchangeNameProvider.getRpcRequestExchangeName(this.context.getName());
            String rpcResponseExchange = this.exchangeNameProvider.getRpcResponseExchangeName(this.context.getRpcResponseEventName());
            log.info(String.format("Declaring topology for RPC consumer.\nrequestExchange: %s\nresponseExchange: %s\nrequestQueue: %s\n", rpcRequestExchange, rpcResponseExchange, queueName));
            this.declareQueue(channel, queueName, false, true, queueDeclarationArgs);
            this.exchangeDeclarator.declareExchange(rpcRequestExchange, ExchangeType.FANOUT, false);
            channel.queueBind(queueName, rpcRequestExchange, queueName);
            this.exchangeDeclarator.declareExchange(rpcResponseExchange, ExchangeType.DIRECT, false);
            channel.basicConsume(queueName, false, this.callback, consumerTag -> log.warn("Channel canceled for {}", (Object)queueName), (consumerTag, sig) -> this.reInitChannel(sig, queueName, consumerTag));
            log.info("consumer (RPC) started on queue '{}' --> {} binding key(s): {}", new Object[]{queueName, exchange, queueName});
        } else {
            Optional<String> optionalPrefixedDeadLetterQueue = this.context.getDeadLetterQueueName();
            if (optionalPrefixedDeadLetterQueue.isPresent()) {
                this.declareAndBindDeadLetterQueue(channel, optionalPrefixedDeadLetterQueue.get());
                queueDeclarationArgs.put("x-dead-letter-routing-key", this.context.getDeadLetterRoutingKey(queueName));
                queueDeclarationArgs.put("x-dead-letter-exchange", this.context.getDeadLetterExchangeName().orElseThrow());
            }
            boolean autoDelete = this.context.isConsumerOnEveryInstance() || this.context.isAutoDelete();
            this.declareQueue(channel, queueName, this.getDurable(), autoDelete, queueDeclarationArgs);
            this.exchangeDeclarator.declareExchange(exchange, exchangeType, this.context.isFrontendMessage());
            List<String> bindingKeys = this.getBindingKeys(exchangeType);
            for (String bindingKey : bindingKeys) {
                channel.queueBind(queueName, exchange, bindingKey);
            }
            channel.basicConsume(queueName, false, this.callback, consumerTag -> log.warn("Channel canceled for {}", (Object)queueName), (consumerTag, sig) -> this.reInitChannel(sig, queueName, consumerTag));
            log.info("consumer started on queue '{}' --> {} binding key(s): {}", new Object[]{queueName, exchange, String.join((CharSequence)", ", bindingKeys)});
        }
    }

    private String getExchangeName() {
        return this.context.getName();
    }

    private void reInitChannel(ShutdownSignalException sig, String queueName, String consumerTag) {
        log.warn("Channel shut down for with signal:{}, queue: {}, consumer: {}", new Object[]{sig, queueName, consumerTag});
        try {
            this.channelService.removeChannel(this.channelId);
            this.channelId = UUID.randomUUID().toString();
            this.initChannel();
        }
        catch (IOException e) {
            log.error(String.format("Could not re-initialize channel for queue %s", queueName), (Throwable)e);
        }
    }

    private void declareQueue(Channel channel, String queueName, boolean durable, boolean autoDelete, Map<String, Object> args) throws IOException {
        QueueDeclarator.QueueDeclarationDetails details = new QueueDeclarator.QueueDeclarationDetails(queueName, durable, false, autoDelete, args);
        this.queueDeclarator.declareQueueWithRecreateOnConflict(channel, details);
    }

    private void declareAndBindDeadLetterQueue(Channel channel, String deadLetterQueue) throws IOException {
        if (this.context.isCustomDeadLetterQueue()) {
            channel.exchangeDeclare(deadLetterQueue, BuiltinExchangeType.TOPIC, true);
            QueueDeclarator.QueueDeclarationDetails details = new QueueDeclarator.QueueDeclarationDetails(deadLetterQueue, true, false, false, null);
            this.queueDeclarator.declareQueueWithRecreateOnConflict(channel, details);
            channel.queueBind(deadLetterQueue, deadLetterQueue, "#");
        }
    }

    private boolean getDurable() {
        if (this.context.isFrontendMessage()) {
            return false;
        }
        if (this.context.isRpc()) {
            return false;
        }
        return this.context.isDurable();
    }

    private List<String> getBindingKeys(ExchangeType exchangeType) {
        String name = this.context.getName();
        if (this.context.isFrontendMessage()) {
            return List.of("#." + name);
        }
        return switch (exchangeType) {
            default -> throw new IncompatibleClassChangeError();
            case ExchangeType.DIRECT, ExchangeType.TOPIC -> this.context.getBindingKeys();
            case ExchangeType.FANOUT -> List.of("#." + name);
        };
    }

    private static void validateBindingKeys(List<String> bindingKeys, ExchangeType exchangeType) {
        if (exchangeType == ExchangeType.FANOUT) {
            return;
        }
        if (bindingKeys == null || bindingKeys.size() == 0) {
            throw new IllegalArgumentException("Binding key(s) are required when declaring a " + exchangeType.name() + " type exchange.");
        }
        if (exchangeType == ExchangeType.DIRECT && bindingKeys.size() > 1) {
            throw new IllegalArgumentException("Exactly one binding key is required when declaring a direct type exchange.");
        }
    }

    public void handleRecovery(Recoverable recoverable) {
        log.info("handleRecovery called for consumer {}", (Object)this.context.getName());
        try {
            this.initChannel();
        }
        catch (IOException e) {
            log.error(String.format("Failed handling recovery for consumer %s", this.context.getName()), (Throwable)e);
            throw new RuntimeException(e);
        }
    }

    public void handleRecoveryStarted(Recoverable recoverable) {
        log.info("handleRecoveryStarted for consumer {}", (Object)this.context.getName());
    }
}

