/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.commons.logging.LogFactory;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.jspecify.annotations.NonNull;
import org.jspecify.annotations.Nullable;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.event.ContainerStoppedEvent;
import org.springframework.kafka.listener.AfterRollbackProcessor;
import org.springframework.kafka.listener.BatchInterceptor;
import org.springframework.kafka.listener.CommonErrorHandler;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.DefaultAfterRollbackProcessor;
import org.springframework.kafka.listener.GenericMessageListener;
import org.springframework.kafka.listener.GenericMessageListenerContainer;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.kafka.listener.RecordInterceptor;
import org.springframework.kafka.support.TopicPartitionOffset;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

public abstract class AbstractMessageListenerContainer<K, V>
implements GenericMessageListenerContainer<K, V>,
BeanNameAware,
ApplicationEventPublisherAware,
ApplicationContextAware {
    public static final int DEFAULT_PHASE = 2147483547;
    private static final int DEFAULT_TOPIC_CHECK_TIMEOUT = 30;
    protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(this.getClass()));
    protected final @NonNull ConsumerFactory<K, V> consumerFactory;
    private final ContainerProperties containerProperties;
    protected final ReentrantLock lifecycleLock = new ReentrantLock();
    protected final AtomicBoolean enforceRebalanceRequested = new AtomicBoolean();
    private final Set<TopicPartition> pauseRequestedPartitions = ConcurrentHashMap.newKeySet();
    private @NonNull String beanName = "noBeanNameSet";
    private @Nullable ApplicationEventPublisher applicationEventPublisher;
    private @Nullable CommonErrorHandler commonErrorHandler;
    private boolean autoStartup = true;
    private int phase = 2147483547;
    private AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor = new DefaultAfterRollbackProcessor<K, V>();
    private int topicCheckTimeout = 30;
    private @Nullable RecordInterceptor<K, V> recordInterceptor;
    private @Nullable BatchInterceptor<K, V> batchInterceptor;
    private boolean interceptBeforeTx = true;
    private byte[] listenerInfo;
    private @Nullable ApplicationContext applicationContext;
    private volatile boolean running = false;
    private volatile boolean fenced = false;
    private volatile boolean paused;
    private volatile boolean stoppedNormally = true;
    private @Nullable String mainListenerId;
    private boolean changeConsumerThreadName;
    private @NonNull Function<MessageListenerContainer, String> threadNameSupplier = container -> container.getListenerId();
    private @Nullable KafkaAdmin kafkaAdmin;

    protected AbstractMessageListenerContainer(@Nullable ConsumerFactory<? super K, ? super V> consumerFactory, ContainerProperties containerProperties) {
        Boolean subBatchPerPartition;
        Assert.notNull((Object)containerProperties, (String)"'containerProperties' cannot be null");
        Assert.notNull(consumerFactory, (String)"'consumerFactory' cannot be null");
        this.consumerFactory = consumerFactory;
        @Nullable String @Nullable [] topics = containerProperties.getTopics();
        if (topics != null) {
            this.containerProperties = new ContainerProperties(topics);
        } else {
            Pattern topicPattern = containerProperties.getTopicPattern();
            if (topicPattern != null) {
                this.containerProperties = new ContainerProperties(topicPattern);
            } else {
                @Nullable TopicPartitionOffset @Nullable [] topicPartitions = containerProperties.getTopicPartitions();
                if (topicPartitions != null) {
                    this.containerProperties = new ContainerProperties(topicPartitions);
                } else {
                    throw new IllegalStateException("topics, topicPattern, or topicPartitions must be provided");
                }
            }
        }
        BeanUtils.copyProperties((Object)containerProperties, (Object)this.containerProperties, (String[])new String[]{"topics", "topicPartitions", "topicPattern", "ackCount", "ackTime", "subBatchPerPartition"});
        if (containerProperties.getAckCount() > 0) {
            this.containerProperties.setAckCount(containerProperties.getAckCount());
        }
        if (containerProperties.getAckTime() > 0L) {
            this.containerProperties.setAckTime(containerProperties.getAckTime());
        }
        if ((subBatchPerPartition = containerProperties.getSubBatchPerPartition()) != null) {
            this.containerProperties.setSubBatchPerPartition(subBatchPerPartition);
        }
        if (this.containerProperties.getConsumerRebalanceListener() == null) {
            this.containerProperties.setConsumerRebalanceListener(this.createSimpleLoggingConsumerRebalanceListener());
        }
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    protected @Nullable ApplicationContext getApplicationContext() {
        return this.applicationContext;
    }

    public void setBeanName(String name) {
        this.beanName = name;
    }

    public String getBeanName() {
        return this.beanName;
    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.applicationEventPublisher = applicationEventPublisher;
    }

    public @Nullable ApplicationEventPublisher getApplicationEventPublisher() {
        return this.applicationEventPublisher;
    }

    public @Nullable CommonErrorHandler getCommonErrorHandler() {
        return this.commonErrorHandler;
    }

    public void setCommonErrorHandler(@Nullable CommonErrorHandler commonErrorHandler) {
        this.commonErrorHandler = commonErrorHandler;
    }

    protected boolean isStoppedNormally() {
        return this.stoppedNormally;
    }

    protected void setStoppedNormally(boolean stoppedNormally) {
        this.stoppedNormally = stoppedNormally;
    }

    public boolean isAutoStartup() {
        return this.autoStartup;
    }

    @Override
    public void setAutoStartup(boolean autoStartup) {
        this.autoStartup = autoStartup;
    }

    protected void setRunning(boolean running) {
        this.running = running;
    }

    public boolean isRunning() {
        return this.running;
    }

    protected void setFenced(boolean fenced) {
        this.fenced = fenced;
    }

    @Deprecated(since="3.2", forRemoval=true)
    protected boolean isPaused() {
        return this.paused;
    }

    @Override
    public boolean isPartitionPauseRequested(TopicPartition topicPartition) {
        return this.pauseRequestedPartitions.contains(topicPartition);
    }

    @Override
    public void pausePartition(TopicPartition topicPartition) {
        this.pauseRequestedPartitions.add(topicPartition);
    }

    @Override
    public void resumePartition(TopicPartition topicPartition) {
        this.pauseRequestedPartitions.remove(topicPartition);
    }

    @Override
    public boolean isPauseRequested() {
        return this.paused;
    }

    public void setPhase(int phase) {
        this.phase = phase;
    }

    public int getPhase() {
        return this.phase;
    }

    public AfterRollbackProcessor<? super K, ? super V> getAfterRollbackProcessor() {
        return this.afterRollbackProcessor;
    }

    public void setAfterRollbackProcessor(AfterRollbackProcessor<? super K, ? super V> afterRollbackProcessor) {
        Assert.notNull(afterRollbackProcessor, (String)"'afterRollbackProcessor' cannot be null");
        this.afterRollbackProcessor = afterRollbackProcessor;
    }

    @Override
    public ContainerProperties getContainerProperties() {
        return this.containerProperties;
    }

    @Override
    public @Nullable String getGroupId() {
        return this.containerProperties.getGroupId() == null ? (String)this.consumerFactory.getConfigurationProperties().get("group.id") : this.containerProperties.getGroupId();
    }

    @Override
    public String getListenerId() {
        return this.beanName;
    }

    public void setMainListenerId(String id) {
        this.mainListenerId = id;
    }

    @Override
    public @Nullable String getMainListenerId() {
        return this.mainListenerId;
    }

    @Override
    public byte[] getListenerInfo() {
        return this.listenerInfo != null ? Arrays.copyOf(this.listenerInfo, this.listenerInfo.length) : null;
    }

    public void setListenerInfo(@Nullable byte[] listenerInfo) {
        this.listenerInfo = listenerInfo != null ? Arrays.copyOf(listenerInfo, listenerInfo.length) : null;
    }

    public void setTopicCheckTimeout(int topicCheckTimeout) {
        this.topicCheckTimeout = topicCheckTimeout;
    }

    public boolean isChangeConsumerThreadName() {
        return this.changeConsumerThreadName;
    }

    public void setChangeConsumerThreadName(boolean changeConsumerThreadName) {
        this.changeConsumerThreadName = changeConsumerThreadName;
    }

    public Function<MessageListenerContainer, String> getThreadNameSupplier() {
        return this.threadNameSupplier;
    }

    public void setThreadNameSupplier(Function<MessageListenerContainer, String> threadNameSupplier) {
        Assert.notNull(threadNameSupplier, (String)"'threadNameSupplier' cannot be null");
        this.threadNameSupplier = threadNameSupplier;
    }

    public @Nullable KafkaAdmin getKafkaAdmin() {
        return this.kafkaAdmin;
    }

    public void setKafkaAdmin(KafkaAdmin kafkaAdmin) {
        this.kafkaAdmin = kafkaAdmin;
    }

    protected @Nullable RecordInterceptor<K, V> getRecordInterceptor() {
        return this.recordInterceptor;
    }

    public void setRecordInterceptor(@Nullable RecordInterceptor<K, V> recordInterceptor) {
        this.recordInterceptor = recordInterceptor;
    }

    protected @Nullable BatchInterceptor<K, V> getBatchInterceptor() {
        return this.batchInterceptor;
    }

    public void setBatchInterceptor(@Nullable BatchInterceptor<K, V> batchInterceptor) {
        this.batchInterceptor = batchInterceptor;
    }

    protected boolean isInterceptBeforeTx() {
        return this.interceptBeforeTx;
    }

    public void setInterceptBeforeTx(boolean interceptBeforeTx) {
        this.interceptBeforeTx = interceptBeforeTx;
    }

    @Override
    public void setupMessageListener(Object messageListener) {
        this.containerProperties.setMessageListener(messageListener);
    }

    public final void start() {
        this.checkGroupId();
        this.lifecycleLock.lock();
        try {
            if (!this.isRunning()) {
                Assert.state((boolean)(this.containerProperties.getMessageListener() instanceof GenericMessageListener), () -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
                Assert.state((!this.fenced ? 1 : 0) != 0, (String)"Container Fenced. It is not allowed to start.");
                this.doStart();
            }
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    protected void checkTopics() {
        if (this.containerProperties.isMissingTopicsFatal() && this.containerProperties.getTopicPattern() == null) {
            Map<String, Object> configs = this.consumerFactory.getConfigurationProperties().entrySet().stream().filter(entry -> AdminClientConfig.configNames().contains(entry.getKey())).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
            Properties overrides = this.propertiesFromConsumerPropertyOverrides();
            overrides.forEach((BiConsumer<? super Object, ? super Object>)((BiConsumer<Object, Object>)(key, value) -> {
                if (key instanceof String) {
                    configs.put((String)key, value);
                }
            }));
            List missing = null;
            try (AdminClient client = AdminClient.create(configs);){
                if (client != null) {
                    @Nullable String @Nullable [] topics = this.containerProperties.getTopics();
                    if (topics == null) {
                        topics = (String[])Arrays.stream(this.containerProperties.getTopicPartitions()).map(TopicPartitionOffset::getTopic).toArray(String[]::new);
                    }
                    DescribeTopicsResult result = client.describeTopics(Arrays.asList(topics));
                    missing = result.topicNameValues().entrySet().stream().filter(entry -> {
                        try {
                            ((KafkaFuture)entry.getValue()).get((long)this.topicCheckTimeout, TimeUnit.SECONDS);
                            return false;
                        }
                        catch (InterruptedException ex) {
                            Thread.currentThread().interrupt();
                            return true;
                        }
                        catch (Exception ex) {
                            return true;
                        }
                    }).map(Map.Entry::getKey).collect(Collectors.toList());
                }
            }
            catch (Exception e) {
                this.logger.error((Throwable)e, (CharSequence)"Failed to check topic existence");
            }
            if (missing != null && !missing.isEmpty()) {
                throw new IllegalStateException("Topic(s) " + missing.toString() + " is/are not present and missingTopicsFatal is true");
            }
        }
    }

    public void checkGroupId() {
        if (this.containerProperties.getTopicPartitions() == null) {
            boolean hasGroupIdConsumerConfig = true;
            if (this.consumerFactory != null) {
                Object groupIdConfig = this.consumerFactory.getConfigurationProperties().get("group.id");
                hasGroupIdConsumerConfig = groupIdConfig instanceof String && StringUtils.hasText((String)((String)groupIdConfig));
            }
            Assert.state((hasGroupIdConsumerConfig || StringUtils.hasText((String)this.containerProperties.getGroupId()) ? 1 : 0) != 0, (String)"No group.id found in consumer config, container properties, or @KafkaListener annotation; a group.id is required when group management is used.");
        }
    }

    protected abstract void doStart();

    public final void stop() {
        this.stop(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void stop(boolean wait) {
        if (this.isRunning()) {
            if (wait) {
                CountDownLatch latch = new CountDownLatch(1);
                this.lifecycleLock.lock();
                try {
                    this.doStop(latch::countDown);
                }
                finally {
                    this.lifecycleLock.unlock();
                }
                try {
                    latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS);
                    this.publishContainerStoppedEvent();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            } else {
                this.lifecycleLock.lock();
                try {
                    this.doStop(this::publishContainerStoppedEvent);
                }
                finally {
                    this.lifecycleLock.unlock();
                }
            }
        }
    }

    @Override
    public void pause() {
        this.paused = true;
    }

    @Override
    public void resume() {
        this.paused = false;
    }

    public void stop(Runnable callback) {
        this.lifecycleLock.lock();
        try {
            if (this.isRunning()) {
                this.doStop(callback);
            } else {
                callback.run();
            }
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    @Override
    public void stopAbnormally(Runnable callback) {
        this.lifecycleLock.lock();
        try {
            this.doStop(callback, false);
            this.publishContainerStoppedEvent();
        }
        finally {
            this.lifecycleLock.unlock();
        }
    }

    protected void doStop(Runnable callback) {
        this.doStop(callback, true);
        this.publishContainerStoppedEvent();
    }

    protected abstract void doStop(Runnable var1, boolean var2);

    protected final ConsumerRebalanceListener createSimpleLoggingConsumerRebalanceListener() {
        return new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                AbstractMessageListenerContainer.this.logger.info(() -> AbstractMessageListenerContainer.this.getGroupId() + ": partitions revoked: " + String.valueOf(partitions));
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                AbstractMessageListenerContainer.this.logger.info(() -> AbstractMessageListenerContainer.this.getGroupId() + ": partitions assigned: " + String.valueOf(partitions));
            }

            public void onPartitionsLost(Collection<TopicPartition> partitions) {
                AbstractMessageListenerContainer.this.logger.info(() -> AbstractMessageListenerContainer.this.getGroupId() + ": partitions lost: " + String.valueOf(partitions));
            }
        };
    }

    protected void publishContainerStoppedEvent() {
        ApplicationEventPublisher eventPublisher = this.getApplicationEventPublisher();
        if (eventPublisher != null) {
            eventPublisher.publishEvent((ApplicationEvent)new ContainerStoppedEvent(this, this.parentOrThis()));
        }
    }

    protected AbstractMessageListenerContainer<?, ?> parentOrThis() {
        return this;
    }

    protected Properties propertiesFromConsumerPropertyOverrides() {
        Properties propertyOverrides = this.containerProperties.getKafkaConsumerProperties();
        Properties props = new Properties();
        props.putAll((Map<?, ?>)propertyOverrides);
        Set<String> stringPropertyNames = propertyOverrides.stringPropertyNames();
        stringPropertyNames.forEach(name -> {
            if (!props.contains(name)) {
                props.setProperty((String)name, propertyOverrides.getProperty((String)name));
            }
        });
        return props;
    }
}

