/*
 * Decompiled with CFR 0.152.
 */
package org.reactivecommons.async.rabbit.listeners;

import com.fasterxml.jackson.databind.JsonNode;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Delivery;
import java.util.Optional;
import java.util.function.Function;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.api.handlers.CloudCommandHandler;
import org.reactivecommons.async.api.handlers.DomainCommandHandler;
import org.reactivecommons.async.api.handlers.registered.RegisteredCommandHandler;
import org.reactivecommons.async.commons.CommandExecutor;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.rabbit.RabbitMessage;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import org.reactivecommons.async.rabbit.listeners.GenericMessageListener;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;

public class ApplicationCommandListener
extends GenericMessageListener {
    @Generated
    private static final Logger log = Logger.getLogger(ApplicationCommandListener.class.getName());
    private static final String DQL = ".DLQ";
    private static final String NAME = "name";
    private static final String COMMAND_ID = "commandId";
    private static final String TYPE = "type";
    private final MessageConverter messageConverter;
    private final HandlerResolver resolver;
    private final String directExchange;
    private final boolean withDLQRetry;
    private final boolean delayedCommands;
    private final int retryDelay;
    private final Optional<Integer> maxLengthBytes;

    public ApplicationCommandListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, String directExchange, MessageConverter messageConverter, boolean withDLQRetry, boolean createTopology, boolean delayedCommands, long maxRetries, int retryDelay, Optional<Integer> maxLengthBytes, DiscardNotifier discardNotifier, CustomReporter errorReporter) {
        super(queueName, listener, withDLQRetry, createTopology, maxRetries, retryDelay, discardNotifier, "command", errorReporter);
        this.retryDelay = retryDelay;
        this.withDLQRetry = withDLQRetry;
        this.delayedCommands = delayedCommands;
        this.resolver = resolver;
        this.directExchange = directExchange;
        this.messageConverter = messageConverter;
        this.maxLengthBytes = maxLengthBytes;
    }

    @Override
    protected Mono<Void> setUpBindings(TopologyCreator creator) {
        Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange((String)this.directExchange).durable(true).type("direct"));
        if (this.withDLQRetry) {
            Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange((String)(this.directExchange + DQL)).durable(true).type("direct"));
            Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(this.queueName, this.directExchange + DQL, this.maxLengthBytes);
            Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(this.queueName, this.directExchange, this.retryDelay, this.maxLengthBytes);
            Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding((String)this.directExchange, (String)this.queueName, (String)this.queueName));
            Mono<AMQP.Queue.BindOk> bindingDLQ = creator.bind(BindingSpecification.binding((String)(this.directExchange + DQL), (String)this.queueName, (String)(this.queueName + DQL)));
            return declareExchange.then(declareExchangeDLQ).then(declareDLQ).then(declareQueue).then(bindingDLQ).then(binding).then(this.declareDelayedTopology(creator)).then();
        }
        Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(this.queueName, this.maxLengthBytes);
        Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding((String)this.directExchange, (String)this.queueName, (String)this.queueName));
        return declareExchange.then(declareQueue).then(binding).then(this.declareDelayedTopology(creator)).then();
    }

    private Mono<Void> declareDelayedTopology(TopologyCreator creator) {
        if (this.delayedCommands) {
            String delayedQueue = this.queueName + "-delayed";
            Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(delayedQueue, this.directExchange, this.maxLengthBytes, Optional.of(this.queueName));
            Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding((String)this.directExchange, (String)delayedQueue, (String)delayedQueue));
            return declareQueue.then(binding).then();
        }
        return Mono.empty();
    }

    @Override
    protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
        RegisteredCommandHandler handler = this.resolver.getCommandHandler(executorPath);
        Function<Message, Object> converter = this.resolveConverter(handler);
        CommandExecutor executor = new CommandExecutor(handler.getHandler(), converter);
        return msj -> executor.execute(msj).cast(Object.class);
    }

    @Override
    protected String getExecutorPath(AcknowledgableDelivery msj) {
        RabbitMessage rabbitMessage = RabbitMessage.fromDelivery((Delivery)msj);
        JsonNode jsonNode = (JsonNode)this.messageConverter.readValue((Message)rabbitMessage, JsonNode.class);
        if (jsonNode.get(COMMAND_ID) != null) {
            return jsonNode.get(NAME).asText();
        }
        return jsonNode.get(TYPE).asText();
    }

    @Override
    protected Object parseMessageForReporter(Message message) {
        return this.messageConverter.readCommandStructure(message);
    }

    private <T, D> Function<Message, Object> resolveConverter(RegisteredCommandHandler<T, D> registeredCommandHandler) {
        if (registeredCommandHandler.getHandler() instanceof DomainCommandHandler) {
            Class commandClass = registeredCommandHandler.getInputClass();
            return msj -> this.messageConverter.readCommand(msj, commandClass);
        }
        if (registeredCommandHandler.getHandler() instanceof CloudCommandHandler) {
            return arg_0 -> ((MessageConverter)this.messageConverter).readCloudEvent(arg_0);
        }
        throw new RuntimeException("Unknown handler type");
    }
}

