/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.retrytopic;

import java.time.Clock;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.regex.Pattern;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpoint;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
import org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.Assert;
import org.springframework.util.backoff.BackOff;

public class ListenerContainerFactoryConfigurer {
    private BackOff providedBlockingBackOff = null;
    private Class<? extends Exception>[] blockingExceptionTypes = null;
    private Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer = container -> {};
    private Consumer<DefaultErrorHandler> errorHandlerCustomizer = errorHandler -> {};
    private final DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory;
    private final KafkaConsumerBackoffManager kafkaConsumerBackoffManager;
    private final Clock clock;

    public ListenerContainerFactoryConfigurer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager, DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory, Clock clock) {
        this.kafkaConsumerBackoffManager = kafkaConsumerBackoffManager;
        this.deadLetterPublishingRecovererFactory = deadLetterPublishingRecovererFactory;
        this.clock = clock;
    }

    public KafkaListenerContainerFactory<?> decorateFactory(ConcurrentKafkaListenerContainerFactory<?, ?> factory, Configuration configuration) {
        return new RetryTopicListenerContainerFactoryDecorator(factory, configuration, true);
    }

    public KafkaListenerContainerFactory<?> decorateFactoryWithoutSettingContainerProperties(ConcurrentKafkaListenerContainerFactory<?, ?> factory, Configuration configuration) {
        return new RetryTopicListenerContainerFactoryDecorator(factory, configuration, false);
    }

    public void setBlockingRetriesBackOff(BackOff blockingBackOff) {
        Assert.notNull((Object)blockingBackOff, (String)"The provided BackOff cannot be null");
        Assert.state((this.providedBlockingBackOff == null ? 1 : 0) != 0, () -> "Blocking retries back off has already been set. Current: " + this.providedBlockingBackOff + " You provided: " + blockingBackOff);
        this.providedBlockingBackOff = blockingBackOff;
    }

    @SafeVarargs
    public final void setBlockingRetryableExceptions(Class<? extends Exception> ... exceptionTypes) {
        Assert.notNull(exceptionTypes, (String)"The exception types cannot be null");
        Assert.noNullElements((Object[])exceptionTypes, (String)"The exception types cannot have null elements");
        Assert.state((this.blockingExceptionTypes == null ? 1 : 0) != 0, () -> "Blocking retryable exceptions have already been set.Current ones: " + Arrays.toString(this.blockingExceptionTypes) + " You provided: " + Arrays.toString(exceptionTypes));
        this.blockingExceptionTypes = Arrays.copyOf(exceptionTypes, exceptionTypes.length);
    }

    public void setContainerCustomizer(Consumer<ConcurrentMessageListenerContainer<?, ?>> containerCustomizer) {
        Assert.notNull(containerCustomizer, (String)"'containerCustomizer' cannot be null");
        this.containerCustomizer = containerCustomizer;
    }

    public void setErrorHandlerCustomizer(Consumer<DefaultErrorHandler> errorHandlerCustomizer) {
        this.errorHandlerCustomizer = errorHandlerCustomizer;
    }

    protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer, Configuration configuration) {
        DefaultErrorHandler errorHandler = this.createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer);
        errorHandler.defaultFalse();
        errorHandler.setCommitRecovered(true);
        errorHandler.setLogLevel(KafkaException.Level.DEBUG);
        if (this.blockingExceptionTypes != null) {
            errorHandler.addRetryableExceptions(this.blockingExceptionTypes);
        }
        this.errorHandlerCustomizer.accept(errorHandler);
        return errorHandler;
    }

    protected DefaultErrorHandler createDefaultErrorHandlerInstance(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        return this.providedBlockingBackOff != null ? new DefaultErrorHandler(deadLetterPublishingRecoverer, this.providedBlockingBackOff) : new DefaultErrorHandler(deadLetterPublishingRecoverer);
    }

    protected void setupBackoffAwareMessageListenerAdapter(ConcurrentMessageListenerContainer<?, ?> container, Configuration configuration, boolean isSetContainerProperties) {
        MessageListener listener = this.checkAndCast(container.getContainerProperties().getMessageListener(), MessageListener.class);
        container.setupMessageListener(new KafkaBackoffAwareMessageListenerAdapter(listener, this.kafkaConsumerBackoffManager, container.getListenerId(), this.clock));
        this.containerCustomizer.accept(container);
    }

    private <T> T checkAndCast(Object obj, Class<T> clazz) {
        Assert.isAssignable(clazz, obj.getClass(), () -> String.format("The provided class %s is not assignable from %s", obj.getClass().getSimpleName(), clazz.getSimpleName()));
        return (T)obj;
    }

    static class Configuration {
        private final List<Long> backOffValues;

        Configuration(List<Long> backOffValues) {
            this.backOffValues = backOffValues;
        }
    }

    private class RetryTopicListenerContainerFactoryDecorator
    implements KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<?, ?>> {
        private final ConcurrentKafkaListenerContainerFactory<?, ?> delegate;
        private final Configuration configuration;
        private final boolean isSetContainerProperties;

        RetryTopicListenerContainerFactoryDecorator(ConcurrentKafkaListenerContainerFactory<?, ?> delegate, Configuration configuration, boolean isSetContainerProperties) {
            this.delegate = delegate;
            this.configuration = configuration;
            this.isSetContainerProperties = isSetContainerProperties;
        }

        @Override
        public ConcurrentMessageListenerContainer<?, ?> createListenerContainer(KafkaListenerEndpoint endpoint) {
            return this.decorate((ConcurrentMessageListenerContainer)this.delegate.createListenerContainer(endpoint));
        }

        private ConcurrentMessageListenerContainer<?, ?> decorate(ConcurrentMessageListenerContainer<?, ?> listenerContainer) {
            ContainerProperties.AckMode ackMode;
            CommonErrorHandler errorHandler = ListenerContainerFactoryConfigurer.this.createErrorHandler(ListenerContainerFactoryConfigurer.this.deadLetterPublishingRecovererFactory.create(), this.configuration);
            if (listenerContainer.getContainerProperties().isAsyncAcks() && (ContainerProperties.AckMode.MANUAL.equals((Object)(ackMode = listenerContainer.getContainerProperties().getAckMode())) || ContainerProperties.AckMode.MANUAL_IMMEDIATE.equals((Object)ackMode)) && errorHandler instanceof DefaultErrorHandler) {
                ((DefaultErrorHandler)errorHandler).setSeekAfterError(false);
            }
            listenerContainer.setCommonErrorHandler(errorHandler);
            ListenerContainerFactoryConfigurer.this.setupBackoffAwareMessageListenerAdapter(listenerContainer, this.configuration, this.isSetContainerProperties);
            return listenerContainer;
        }

        @Override
        public ConcurrentMessageListenerContainer<?, ?> createContainer(TopicPartitionOffset ... topicPartitions) {
            return this.decorate((ConcurrentMessageListenerContainer)this.delegate.createContainer(topicPartitions));
        }

        @Override
        public ConcurrentMessageListenerContainer<?, ?> createContainer(String ... topics) {
            return this.decorate((ConcurrentMessageListenerContainer)this.delegate.createContainer(topics));
        }

        @Override
        public ConcurrentMessageListenerContainer<?, ?> createContainer(Pattern topicPattern) {
            return this.decorate((ConcurrentMessageListenerContainer)this.delegate.createContainer(topicPattern));
        }
    }
}

