/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.kafka.listener;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.jspecify.annotations.Nullable;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.core.log.LogAccessor;
import org.springframework.core.task.AsyncTaskExecutor;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.kafka.core.ShareConsumerFactory;
import org.springframework.kafka.event.ConsumerStartedEvent;
import org.springframework.kafka.event.ConsumerStartingEvent;
import org.springframework.kafka.listener.AbstractShareKafkaMessageListenerContainer;
import org.springframework.kafka.listener.AcknowledgingConsumerAwareMessageListener;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.GenericMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.util.Assert;

public class ShareKafkaMessageListenerContainer<K, V>
extends AbstractShareKafkaMessageListenerContainer<K, V> {
    private static final int POLL_TIMEOUT = 1000;
    private @Nullable String clientId;
    private volatile ShareListenerConsumer listenerConsumer;
    private volatile CompletableFuture<Void> listenerConsumerFuture;
    private volatile CountDownLatch startLatch = new CountDownLatch(1);

    public ShareKafkaMessageListenerContainer(ShareConsumerFactory<? super K, ? super V> shareConsumerFactory, ContainerProperties containerProperties) {
        super(shareConsumerFactory, containerProperties);
        Assert.notNull(shareConsumerFactory, (String)"A ShareConsumerFactory must be provided");
    }

    public @Nullable String getClientId() {
        return this.clientId;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    @Override
    public boolean isInExpectedState() {
        return this.isRunning();
    }

    @Override
    public Map<String, Map<MetricName, ? extends Metric>> metrics() {
        ShareListenerConsumer listenerConsumerForMetrics = this.listenerConsumer;
        if (listenerConsumerForMetrics != null) {
            Map metrics = listenerConsumerForMetrics.consumer.metrics();
            return Collections.singletonMap(listenerConsumerForMetrics.getClientId(), metrics);
        }
        return Collections.emptyMap();
    }

    @Override
    protected void doStart() {
        GenericMessageListener listener;
        if (this.isRunning()) {
            return;
        }
        ContainerProperties containerProperties = this.getContainerProperties();
        Object messageListener = containerProperties.getMessageListener();
        AsyncTaskExecutor consumerExecutor = containerProperties.getListenerTaskExecutor();
        if (consumerExecutor == null) {
            consumerExecutor = new SimpleAsyncTaskExecutor((this.getBeanName() == null ? "" : this.getBeanName()) + "-C-");
            containerProperties.setListenerTaskExecutor(consumerExecutor);
        }
        Assert.state(((listener = (GenericMessageListener)messageListener) != null ? 1 : 0) != 0, (String)"'messageListener' cannot be null");
        this.listenerConsumer = new ShareListenerConsumer(listener);
        this.setRunning(true);
        this.listenerConsumerFuture = CompletableFuture.runAsync(this.listenerConsumer, (Executor)consumerExecutor);
    }

    @Override
    protected void doStop() {
        this.setRunning(false);
    }

    private void publishConsumerStartingEvent() {
        this.startLatch.countDown();
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerStartingEvent(this, this));
        }
    }

    private void publishConsumerStartedEvent() {
        ApplicationEventPublisher publisher = this.getApplicationEventPublisher();
        if (publisher != null) {
            publisher.publishEvent((ApplicationEvent)new ConsumerStartedEvent(this, this));
        }
    }

    private class ShareListenerConsumer
    implements Runnable {
        private final LogAccessor logger;
        private final ShareConsumer<K, V> consumer;
        private final GenericMessageListener<?> genericListener;
        private final @Nullable String consumerGroupId;
        private final @Nullable String clientId;

        ShareListenerConsumer(GenericMessageListener<?> listener) {
            this.logger = ShareKafkaMessageListenerContainer.this.logger;
            this.consumerGroupId = ShareKafkaMessageListenerContainer.this.getGroupId();
            this.consumer = ShareKafkaMessageListenerContainer.this.shareConsumerFactory.createShareConsumer(ShareKafkaMessageListenerContainer.this.getGroupId(), ShareKafkaMessageListenerContainer.this.getClientId());
            this.genericListener = listener;
            this.clientId = ShareKafkaMessageListenerContainer.this.getClientId();
            ContainerProperties containerProperties = ShareKafkaMessageListenerContainer.this.getContainerProperties();
            this.consumer.subscribe(Arrays.asList(containerProperties.getTopics()));
        }

        @Nullable String getClientId() {
            return this.clientId;
        }

        @Override
        public void run() {
            this.initialize();
            Exception exitThrowable = null;
            while (ShareKafkaMessageListenerContainer.this.isRunning()) {
                try {
                    ConsumerRecords records = this.consumer.poll(Duration.ofMillis(1000L));
                    if (records == null || records.count() <= 0) continue;
                    for (ConsumerRecord record : records) {
                        GenericMessageListener<?> genericMessageListener = this.genericListener;
                        if (genericMessageListener instanceof AcknowledgingConsumerAwareMessageListener) {
                            AcknowledgingConsumerAwareMessageListener ackListener = (AcknowledgingConsumerAwareMessageListener)genericMessageListener;
                            ackListener.onMessage(record, (Acknowledgment)null, (Consumer<?, ?>)null);
                        } else {
                            GenericMessageListener<?> listener = this.genericListener;
                            listener.onMessage(record);
                        }
                        this.consumer.acknowledge(record, AcknowledgeType.ACCEPT);
                        this.consumer.commitSync();
                    }
                }
                catch (Error e) {
                    this.logger.error((Throwable)e, (CharSequence)"Stopping share consumer due to an Error");
                    this.wrapUp();
                    throw e;
                }
                catch (Exception e) {
                    if (e.getCause() instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    this.logger.error((Throwable)e, (CharSequence)"Error in share consumer poll loop");
                    exitThrowable = e;
                    break;
                }
            }
            if (exitThrowable != null) {
                this.logger.error(exitThrowable, (CharSequence)"ShareListenerConsumer exiting due to error");
            }
            this.wrapUp();
        }

        protected void initialize() {
            ShareKafkaMessageListenerContainer.this.publishConsumerStartingEvent();
            ShareKafkaMessageListenerContainer.this.publishConsumerStartedEvent();
        }

        private void wrapUp() {
            this.consumer.close();
            this.logger.info(() -> this.consumerGroupId + ": Consumer stopped");
        }

        public String toString() {
            return "ShareKafkaMessageListenerContainer.ShareListenerConsumer [consumerGroupId=" + this.consumerGroupId + ", clientId=" + this.clientId + "]";
        }
    }
}

