/*
 * Decompiled with CFR 0.152.
 */
package org.reactivecommons.async.rabbit.listeners;

import com.rabbitmq.client.AMQP;
import io.cloudevents.CloudEvent;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Optional;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.api.handlers.registered.RegisteredQueryHandler;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.HandlerResolver;
import org.reactivecommons.async.commons.QueryExecutor;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.converters.MessageConverter;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageSender;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import org.reactivecommons.async.rabbit.listeners.GenericMessageListener;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.BindingSpecification;
import reactor.rabbitmq.ExchangeSpecification;

public class ApplicationQueryListener
extends GenericMessageListener {
    @Generated
    private static final Logger log = Logger.getLogger(ApplicationQueryListener.class.getName());
    private final MessageConverter converter;
    private final HandlerResolver handlerResolver;
    private final ReactiveMessageSender sender;
    private final String replyExchange;
    private final String directExchange;
    private final boolean withDLQRetry;
    private final int retryDelay;
    private final Optional<Integer> maxLengthBytes;
    private final boolean discardTimeoutQueries;

    public ApplicationQueryListener(ReactiveMessageListener listener, String queueName, HandlerResolver resolver, ReactiveMessageSender sender, String directExchange, MessageConverter converter, String replyExchange, boolean withDLQRetry, boolean createTopology, long maxRetries, int retryDelay, Optional<Integer> maxLengthBytes, boolean discardTimeoutQueries, DiscardNotifier discardNotifier, CustomReporter errorReporter) {
        super(queueName, listener, withDLQRetry, createTopology, maxRetries, retryDelay, discardNotifier, "query", errorReporter);
        this.retryDelay = retryDelay;
        this.withDLQRetry = withDLQRetry;
        this.converter = converter;
        this.handlerResolver = resolver;
        this.sender = sender;
        this.replyExchange = replyExchange;
        this.directExchange = directExchange;
        this.maxLengthBytes = maxLengthBytes;
        this.discardTimeoutQueries = discardTimeoutQueries;
    }

    @Override
    protected Function<Message, Mono<Object>> rawMessageHandler(String executorPath) {
        RegisteredQueryHandler handler = this.handlerResolver.getQueryHandler(executorPath);
        if (handler == null) {
            return message -> Mono.error((Throwable)new RuntimeException("Handler Not registered for Query: " + executorPath));
        }
        Function<Message, Object> messageConverter = this.resolveConverter(handler.getQueryClass());
        QueryExecutor executor = new QueryExecutor(handler.getHandler(), messageConverter);
        return arg_0 -> ((QueryExecutor)executor).execute(arg_0);
    }

    private Function<Message, Object> resolveConverter(Class<?> handlerClass) {
        if (handlerClass == CloudEvent.class) {
            return arg_0 -> ((MessageConverter)this.converter).readCloudEvent(arg_0);
        }
        return msj -> this.converter.readAsyncQuery(msj, handlerClass).getQueryData();
    }

    @Override
    protected Mono<Void> setUpBindings(TopologyCreator creator) {
        Mono<AMQP.Exchange.DeclareOk> declareExchange = creator.declare(ExchangeSpecification.exchange((String)this.directExchange).durable(true).type("direct"));
        if (this.withDLQRetry) {
            Mono<AMQP.Exchange.DeclareOk> declareExchangeDLQ = creator.declare(ExchangeSpecification.exchange((String)(this.directExchange + ".DLQ")).durable(true).type("direct"));
            Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(this.queueName, this.directExchange + ".DLQ", this.maxLengthBytes);
            Mono<AMQP.Queue.DeclareOk> declareDLQ = creator.declareDLQ(this.queueName, this.directExchange, this.retryDelay, this.maxLengthBytes);
            Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding((String)this.directExchange, (String)this.queueName, (String)this.queueName));
            Mono<AMQP.Queue.BindOk> bindingDLQ = creator.bind(BindingSpecification.binding((String)(this.directExchange + ".DLQ"), (String)this.queueName, (String)(this.queueName + ".DLQ")));
            return declareExchange.then(declareExchangeDLQ).then(declareQueue).then(declareDLQ).then(binding).then(bindingDLQ).then();
        }
        Mono<AMQP.Queue.DeclareOk> declareQueue = creator.declareQueue(this.queueName, this.maxLengthBytes);
        Mono<AMQP.Queue.BindOk> binding = creator.bind(BindingSpecification.binding((String)this.directExchange, (String)this.queueName, (String)this.queueName));
        return declareExchange.then(declareQueue).then(binding).then();
    }

    @Override
    protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instant initTime) {
        boolean messageDoesNotContainTimeoutMetadata;
        AMQP.BasicProperties messageProperties = msj.getProperties();
        boolean bl = messageDoesNotContainTimeoutMetadata = messageProperties.getTimestamp() == null || !messageProperties.getHeaders().containsKey("x-reply-timeout-millis");
        if (messageDoesNotContainTimeoutMetadata || !this.discardTimeoutQueries) {
            return super.handle(msj, initTime);
        }
        return this.handleWithTimeout(msj, initTime, messageProperties);
    }

    private Mono<AcknowledgableDelivery> handleWithTimeout(AcknowledgableDelivery msj, Instant initTime, AMQP.BasicProperties messageProperties) {
        long messageTimestamp = msj.getProperties().getTimestamp().getTime();
        long replyTimeoutMillis = ((Integer)messageProperties.getHeaders().get("x-reply-timeout-millis")).intValue();
        long millisUntilTimeout = messageTimestamp + replyTimeoutMillis - this.currentTimestamp().toEpochMilli();
        String executorPath = this.getExecutorPath(msj);
        if (millisUntilTimeout > 0L) {
            return super.handle(msj, initTime).timeout(Duration.ofMillis(millisUntilTimeout), this.buildTimeOutFallback(executorPath));
        }
        return this.buildTimeOutFallback(executorPath);
    }

    private Instant currentTimestamp() {
        return Instant.now();
    }

    private Mono<AcknowledgableDelivery> buildTimeOutFallback(String executorPath) {
        return Mono.fromRunnable(() -> log.log(Level.WARNING, String.format("query with path %s discarded by timeout", executorPath)));
    }

    @Override
    protected String getExecutorPath(AcknowledgableDelivery msj) {
        return msj.getProperties().getHeaders().get("x-serveQuery-id").toString();
    }

    @Override
    protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message msg) {
        return m -> m.materialize().flatMap(signal -> {
            if (signal.isOnError()) {
                return Mono.error((Throwable)Optional.ofNullable(signal.getThrowable()).orElseGet(RuntimeException::new));
            }
            if (signal.isOnComplete()) {
                return Mono.empty();
            }
            String replyID = msg.getProperties().getHeaders().get("x-reply_id").toString();
            String correlationID = msg.getProperties().getHeaders().get("x-correlation-id").toString();
            HashMap<String, Object> headers = new HashMap<String, Object>();
            headers.put("x-correlation-id", correlationID);
            Object response = signal.get();
            return this.sender.sendNoConfirm(response, this.replyExchange, replyID, headers, false);
        });
    }

    @Override
    protected Object parseMessageForReporter(Message msj) {
        return this.converter.readAsyncQueryStructure(msj);
    }
}

