/*
 * Decompiled with CFR 0.152.
 */
package org.reactivecommons.async.rabbit.listeners;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Delivery;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.logging.Level;
import java.util.logging.Logger;
import lombok.Generated;
import org.reactivecommons.async.commons.DiscardNotifier;
import org.reactivecommons.async.commons.FallbackStrategy;
import org.reactivecommons.async.commons.communications.Message;
import org.reactivecommons.async.commons.ext.CustomReporter;
import org.reactivecommons.async.commons.utils.LoggerSubscriber;
import org.reactivecommons.async.rabbit.RabbitMessage;
import org.reactivecommons.async.rabbit.communications.ReactiveMessageListener;
import org.reactivecommons.async.rabbit.communications.TopologyCreator;
import org.reactivestreams.Publisher;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.ConsumeOptions;
import reactor.rabbitmq.Receiver;
import reactor.util.retry.Retry;

public abstract class GenericMessageListener {
    @Generated
    private static final Logger log = Logger.getLogger(GenericMessageListener.class.getName());
    public static final int DEFAULT_RETRIES_DLQ = 10;
    private final ConcurrentHashMap<String, Function<Message, Mono<Object>>> handlers = new ConcurrentHashMap();
    private final Receiver receiver;
    private final ReactiveMessageListener messageListener;
    protected final String queueName;
    private final Scheduler scheduler = Schedulers.newParallel((String)this.getClass().getSimpleName(), (int)12);
    private final Scheduler errorReporterScheduler = Schedulers.newBoundedElastic((int)4, (int)256, (String)"errorReporterScheduler");
    private final boolean useDLQRetries;
    private final boolean createTopology;
    private final long maxRetries;
    private final Duration retryDelay;
    private final DiscardNotifier discardNotifier;
    private final String objectType;
    private final CustomReporter customReporter;
    private volatile Flux<AcknowledgableDelivery> messageFlux;

    protected GenericMessageListener(String queueName, ReactiveMessageListener listener, boolean useDLQRetries, boolean createTopology, long maxRetries, long retryDelay, DiscardNotifier discardNotifier, String objectType, CustomReporter customReporter) {
        this.receiver = listener.getReceiver();
        this.queueName = queueName;
        this.messageListener = listener;
        this.createTopology = createTopology;
        this.maxRetries = GenericMessageListener.resolveRetries(useDLQRetries, maxRetries);
        this.retryDelay = Duration.ofMillis(retryDelay);
        this.useDLQRetries = useDLQRetries;
        this.discardNotifier = discardNotifier;
        this.objectType = objectType;
        this.customReporter = customReporter;
    }

    private static long resolveRetries(boolean useDLQRetries, long maxRetries) {
        return useDLQRetries && maxRetries == -1L ? 10L : maxRetries;
    }

    private boolean hasLocalRetries() {
        return !this.useDLQRetries && this.maxRetries != -1L;
    }

    protected Mono<Void> setUpBindings(TopologyCreator creator) {
        return Mono.empty();
    }

    public void startListener() {
        log.log(Level.INFO, "Using max concurrency {0}, in queue: {1}", new Object[]{this.messageListener.getMaxConcurrency(), this.queueName});
        if (this.useDLQRetries) {
            log.log(Level.INFO, "ATTENTION! Using DLQ Strategy for retries with {0} + 1 Max Retries configured!", new Object[]{this.maxRetries});
        } else {
            log.log(Level.INFO, "ATTENTION! Using infinite fast retries as Retry Strategy");
        }
        ConsumeOptions consumeOptions = new ConsumeOptions();
        consumeOptions.qos(this.messageListener.getPrefetchCount().intValue());
        this.messageFlux = this.createTopology ? this.setUpBindings(this.messageListener.getTopologyCreator()).thenMany((Publisher)this.receiver.consumeManualAck(this.queueName, consumeOptions).transform(this::consumeFaultTolerant)) : this.receiver.consumeManualAck(this.queueName, consumeOptions).doOnError(err -> log.log(Level.SEVERE, "Error listening queue", (Throwable)err)).transform(this::consumeFaultTolerant);
        this.onTerminate();
    }

    private Flux<AcknowledgableDelivery> consumeFaultTolerant(Flux<AcknowledgableDelivery> messageFlux) {
        return messageFlux.flatMap(msj -> {
            Instant init = Instant.now();
            return this.handle((AcknowledgableDelivery)msj, init).doOnSuccess(AcknowledgableDelivery::ack).onErrorResume(err -> this.requeueOrAck((AcknowledgableDelivery)msj, (Throwable)err, init));
        }, this.messageListener.getMaxConcurrency().intValue());
    }

    protected Mono<AcknowledgableDelivery> handle(AcknowledgableDelivery msj, Instant initTime) {
        try {
            String executorPath = this.getExecutorPath(msj);
            Function<Message, Mono<Object>> handler = this.getExecutor(executorPath);
            RabbitMessage message = RabbitMessage.fromDelivery((Delivery)msj, executorPath);
            Mono flow = Mono.defer(() -> (Mono)handler.apply(message)).transform(this.enrichPostProcess(message));
            if (this.hasLocalRetries()) {
                flow = flow.retryWhen((Retry)Retry.fixedDelay((long)this.maxRetries, (Duration)this.retryDelay)).onErrorMap(err -> {
                    if (err.getMessage() != null && err.getMessage().contains("Retries exhausted")) {
                        log.warning(err.getMessage());
                        return err.getCause();
                    }
                    return err;
                });
            }
            return flow.doOnSuccess(o -> this.logExecution(executorPath, initTime, true)).subscribeOn(this.scheduler).thenReturn((Object)msj);
        }
        catch (Exception e) {
            log.log(Level.SEVERE, String.format("ATTENTION !! Outer error protection reached for %s, in Async Consumer!! Severe Warning! ", msj.getProperties().getMessageId()));
            return Mono.error((Throwable)e);
        }
    }

    private void onTerminate() {
        this.messageFlux.doOnTerminate(this::onTerminate).subscribe((CoreSubscriber)new LoggerSubscriber(this.getClass().getName()));
    }

    private void logExecution(String executorPath, Instant initTime, boolean success) {
        try {
            Instant afterExecutionTime = Instant.now();
            long timeElapsed = Duration.between(initTime, afterExecutionTime).toMillis();
            this.doLogExecution(executorPath, timeElapsed);
            this.customReporter.reportMetric(this.objectType, executorPath, Long.valueOf(timeElapsed), success);
        }
        catch (Exception e) {
            log.log(Level.WARNING, "Unable to send execution metrics!", e);
        }
    }

    private void reportErrorMetric(AcknowledgableDelivery msj, Instant initTime) {
        String executorPath;
        try {
            executorPath = this.getExecutorPath(msj);
        }
        catch (Exception e) {
            executorPath = "unknown";
        }
        this.logExecution(executorPath, initTime, false);
    }

    private void doLogExecution(String executorPath, long timeElapsed) {
        log.log(Level.FINE, String.format("%s with path %s handled, took %d ms", this.objectType, executorPath, timeElapsed));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void logError(Throwable err, AcknowledgableDelivery msj, FallbackStrategy strategy) {
        String messageID = msj.getProperties().getMessageId();
        try {
            log.log(Level.SEVERE, String.format("Error encounter while processing message %s: %s", messageID, err.toString()), err);
            log.warning(String.format("Message %s Headers: %s", messageID, msj.getProperties().getHeaders().toString()));
            log.warning(String.format("Message %s Body: %s", messageID, new String(msj.getBody())));
        }
        catch (Exception e) {
            try {
                log.log(Level.SEVERE, "Error Login message Content!!", e);
            }
            catch (Throwable throwable) {
                log.warning(String.format(strategy.message, messageID));
                throw throwable;
            }
            log.warning(String.format(strategy.message, messageID));
        }
        log.warning(String.format(strategy.message, messageID));
    }

    private Function<Message, Mono<Object>> getExecutor(String path) {
        return this.handlers.computeIfAbsent(path, this::rawMessageHandler);
    }

    protected abstract Function<Message, Mono<Object>> rawMessageHandler(String var1);

    protected abstract String getExecutorPath(AcknowledgableDelivery var1);

    protected Function<Mono<Object>, Mono<Object>> enrichPostProcess(Message msg) {
        return Function.identity();
    }

    private Mono<AcknowledgableDelivery> requeueOrAck(AcknowledgableDelivery msj, Throwable err, Instant init) {
        long retryNumber = this.getRetryNumber(msj);
        RabbitMessage rabbitMessage = RabbitMessage.fromDelivery((Delivery)msj);
        boolean redeliver = msj.getEnvelope().isRedeliver();
        this.reportErrorMetric(msj, init);
        this.sendErrorToCustomReporter(err, rabbitMessage, redeliver || retryNumber > 0L);
        if (this.hasLocalRetries() || retryNumber >= this.maxRetries) {
            this.logError(err, msj, FallbackStrategy.DEFINITIVE_DISCARD);
            return this.discardNotifier.notifyDiscard((Message)rabbitMessage).doOnSuccess(_a -> msj.ack()).thenReturn((Object)msj);
        }
        if (this.useDLQRetries) {
            this.logError(err, msj, FallbackStrategy.RETRY_DLQ);
            msj.nack(false);
            return Mono.just((Object)msj);
        }
        this.logError(err, msj, FallbackStrategy.FAST_RETRY);
        return Mono.just((Object)msj).delayElement(this.retryDelay).doOnNext(m -> m.nack(true));
    }

    private void sendErrorToCustomReporter(Throwable err, Message message, boolean redelivered) {
        try {
            this.customReporter.reportError(err, message, this.parseMessageForReporter(message), redelivered).subscribeOn(this.errorReporterScheduler).doOnError(t -> log.log(Level.WARNING, "Error sending error to external reporter", (Throwable)t)).subscribe();
        }
        catch (Throwable t2) {
            log.log(Level.WARNING, "Error in scheduler when sending error to external reporter", t2);
        }
    }

    private Long getRetryNumber(AcknowledgableDelivery delivery) {
        return Optional.ofNullable(delivery.getProperties()).map(AMQP.BasicProperties::getHeaders).map(x -> (List)x.get("x-death")).filter(list -> !list.isEmpty()).map(list -> (HashMap)list.get(0)).map(hashMap -> (Long)hashMap.get("count")).orElse(0L);
    }

    protected abstract Object parseMessageForReporter(Message var1);
}

