/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.time.Clock;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.common.TopicPartition;
import org.springframework.context.ApplicationListener;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.event.ListenerContainerPartitionIdleEvent;
import org.springframework.kafka.listener.KafkaBackoffException;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.KafkaConsumerTimingAdjuster;
import org.springframework.kafka.listener.ListenerContainerRegistry;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.lang.Nullable;

public class PartitionPausingBackoffManager
implements KafkaConsumerBackoffManager,
ApplicationListener<ListenerContainerPartitionIdleEvent> {
    private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaConsumerBackoffManager.class));
    private final ListenerContainerRegistry listenerContainerRegistry;
    private final Map<TopicPartition, KafkaConsumerBackoffManager.Context> backOffContexts;
    private final Clock clock;
    private final KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster;

    public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContainerRegistry, KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster) {
        this.listenerContainerRegistry = listenerContainerRegistry;
        this.kafkaConsumerTimingAdjuster = kafkaConsumerTimingAdjuster;
        this.clock = Clock.systemUTC();
        this.backOffContexts = new HashMap<TopicPartition, KafkaConsumerBackoffManager.Context>();
    }

    public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContainerRegistry) {
        this.listenerContainerRegistry = listenerContainerRegistry;
        this.kafkaConsumerTimingAdjuster = null;
        this.clock = Clock.systemUTC();
        this.backOffContexts = new HashMap<TopicPartition, KafkaConsumerBackoffManager.Context>();
    }

    public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContainerRegistry, KafkaConsumerTimingAdjuster kafkaConsumerTimingAdjuster, Clock clock) {
        this.listenerContainerRegistry = listenerContainerRegistry;
        this.clock = clock;
        this.kafkaConsumerTimingAdjuster = kafkaConsumerTimingAdjuster;
        this.backOffContexts = new HashMap<TopicPartition, KafkaConsumerBackoffManager.Context>();
    }

    public PartitionPausingBackoffManager(ListenerContainerRegistry listenerContainerRegistry, Clock clock) {
        this.listenerContainerRegistry = listenerContainerRegistry;
        this.clock = clock;
        this.kafkaConsumerTimingAdjuster = null;
        this.backOffContexts = new HashMap<TopicPartition, KafkaConsumerBackoffManager.Context>();
    }

    @Override
    public void backOffIfNecessary(KafkaConsumerBackoffManager.Context context) {
        long backoffTime = context.getDueTimestamp() - this.getCurrentMillisFromClock();
        LOGGER.debug(() -> "Back off time: " + backoffTime + " Context: " + context);
        if (backoffTime > 0L) {
            this.pauseConsumptionAndThrow(context, backoffTime);
        }
    }

    private void pauseConsumptionAndThrow(KafkaConsumerBackoffManager.Context context, Long backOffTime) throws KafkaBackoffException {
        TopicPartition topicPartition = context.getTopicPartition();
        this.getListenerContainerFromContext(context).pausePartition(topicPartition);
        this.addBackoff(context, topicPartition);
        throw new KafkaBackoffException(String.format("Partition %s from topic %s is not ready for consumption, backing off for approx. %s millis.", context.getTopicPartition().partition(), context.getTopicPartition().topic(), backOffTime), topicPartition, context.getListenerId(), context.getDueTimestamp());
    }

    public void onApplicationEvent(ListenerContainerPartitionIdleEvent partitionIdleEvent) {
        LOGGER.debug(() -> String.format("partitionIdleEvent received at %s. Partition: %s", this.getCurrentMillisFromClock(), partitionIdleEvent.getTopicPartition()));
        KafkaConsumerBackoffManager.Context backOffContext = this.getBackOffContext(partitionIdleEvent.getTopicPartition());
        this.maybeResumeConsumption(backOffContext);
    }

    private long getCurrentMillisFromClock() {
        return Instant.now(this.clock).toEpochMilli();
    }

    private void maybeResumeConsumption(@Nullable KafkaConsumerBackoffManager.Context context) {
        long pollTimeout;
        if (context == null) {
            return;
        }
        long now = this.getCurrentMillisFromClock();
        long timeUntilDue = context.getDueTimestamp() - now;
        boolean isDue = timeUntilDue <= (pollTimeout = this.getListenerContainerFromContext(context).getContainerProperties().getPollTimeout());
        long adjustedAmount = this.applyTimingAdjustment(context, timeUntilDue, pollTimeout);
        if (adjustedAmount != 0L || isDue) {
            this.resumePartition(context);
        } else {
            LOGGER.debug(() -> String.format("TopicPartition %s not due. DueTimestamp: %s Now: %s ", context.getTopicPartition(), context.getDueTimestamp(), now));
        }
    }

    private long applyTimingAdjustment(KafkaConsumerBackoffManager.Context context, long timeUntilDue, long pollTimeout) {
        if (this.kafkaConsumerTimingAdjuster == null || context.getConsumerForTimingAdjustment() == null) {
            LOGGER.debug(() -> String.format("Skipping timing adjustment for TopicPartition %s.", context.getTopicPartition()));
            return 0L;
        }
        return this.kafkaConsumerTimingAdjuster.adjustTiming(context.getConsumerForTimingAdjustment(), context.getTopicPartition(), pollTimeout, timeUntilDue);
    }

    private void resumePartition(KafkaConsumerBackoffManager.Context context) {
        MessageListenerContainer container = this.getListenerContainerFromContext(context);
        LOGGER.debug(() -> "Resuming partition at " + this.getCurrentMillisFromClock());
        container.resumePartition(context.getTopicPartition());
        this.removeBackoff(context.getTopicPartition());
    }

    private MessageListenerContainer getListenerContainerFromContext(KafkaConsumerBackoffManager.Context context) {
        return this.listenerContainerRegistry.getListenerContainer(context.getListenerId());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void addBackoff(KafkaConsumerBackoffManager.Context context, TopicPartition topicPartition) {
        Map<TopicPartition, KafkaConsumerBackoffManager.Context> map = this.backOffContexts;
        synchronized (map) {
            this.backOffContexts.put(topicPartition, context);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Nullable
    protected KafkaConsumerBackoffManager.Context getBackOffContext(TopicPartition topicPartition) {
        Map<TopicPartition, KafkaConsumerBackoffManager.Context> map = this.backOffContexts;
        synchronized (map) {
            return this.backOffContexts.get(topicPartition);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void removeBackoff(TopicPartition topicPartition) {
        Map<TopicPartition, KafkaConsumerBackoffManager.Context> map = this.backOffContexts;
        synchronized (map) {
            this.backOffContexts.remove(topicPartition);
        }
    }
}

