package org.mule.modules.kafka.internal.service;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.mule.modules.kafka.api.error.exception.KafkaException;
import org.mule.modules.kafka.api.error.exception.UnableToShutdownMuleConsumerException;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/mule/modules/kafka/internal/service/MuleConsumer.class */
public class MuleConsumer {
    private static final int POLLING_TIMEOUT_MS = 1000;
    private static final Logger logger = LoggerFactory.getLogger(MuleConsumer.class);
    private ExecutorService executorService;
    private List<ConsumerGroupTask> consumerGroupTasks = new ArrayList();
    private List<Consumer<?, ?>> consumers;
    private MuleContext muleContext;
    private int partitions;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/modules/kafka/internal/service/MuleConsumer$ConsumerGroupTask.class */
    public class ConsumerGroupTask implements Runnable {
        private SourceCallback callback;
        private long pollingTimeoutMs;
        private volatile boolean running;
        private Consumer<?, ?> consumer;

        public ConsumerGroupTask(SourceCallback sourceCallback, long j, Consumer consumer) {
            this.callback = sourceCallback;
            this.pollingTimeoutMs = j;
            this.consumer = consumer;
        }

        @Override // java.lang.Runnable
        public void run() {
            markAsRunning();
            messagesListenerLoop();
        }

        private void messagesListenerLoop() {
            while (this.running) {
                ConsumerRecords<?, ?> pollNextChunkOfMessages = pollNextChunkOfMessages();
                if (pollNextChunkOfMessages != null) {
                    processGottenRecords(pollNextChunkOfMessages);
                }
            }
            MuleConsumer.logger.debug("Task finished its execution.");
        }

        private void processGottenRecords(ConsumerRecords<?, ?> consumerRecords) {
            Iterator it = consumerRecords.iterator();
            while (it.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord) it.next();
                try {
                    MuleConsumer.logger.debug("Passing message with key: {}, offset: {}, partition: {} for processing. Thread: {}", new Object[]{consumerRecord.key(), Long.valueOf(consumerRecord.offset()), Integer.valueOf(consumerRecord.partition()), Thread.currentThread().getName()});
                    this.callback.handle(Result.builder().output(consumerRecord.value()).build());
                } catch (Exception e) {
                    MuleConsumer.logger.error("Unable to fed message into source callback...", e);
                    MuleConsumer.this.muleContext.getExceptionListener().handleException(e);
                }
            }
        }

        private ConsumerRecords<?, ?> pollNextChunkOfMessages() {
            ConsumerRecords<?, ?> consumerRecords = null;
            MuleConsumer.logger.debug("Polling for messages with a timeout of {}.", Long.valueOf(this.pollingTimeoutMs));
            try {
                consumerRecords = this.consumer.poll(this.pollingTimeoutMs);
                MuleConsumer.logger.debug("Got {} messages.", Integer.valueOf(consumerRecords.count()));
            } catch (KafkaException e) {
                MuleConsumer.logger.warn("Task got unknown exception from server. Task won't be able to recover so it will be stopped.", e);
                handlePollException(e);
            } catch (InvalidOffsetException e2) {
                MuleConsumer.logger.error("Invalid offset is requested. Task won't be able to recover so it will be stopped.", e2);
                handlePollException(e2);
            } catch (AuthorizationException e3) {
                MuleConsumer.logger.error("Consumer not authorized.", e3);
                handlePollException(e3);
            } catch (WakeupException e4) {
                MuleConsumer.logger.debug("Task was forcibly woken up.", e4);
                handlePollException(e4);
            }
            return consumerRecords;
        }

        private void handlePollException(Exception exc) {
            stop();
            MuleConsumer.this.muleContext.getExceptionListener().handleException(exc);
        }

        private void markAsRunning() {
            this.running = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stop() {
            MuleConsumer.logger.debug("Stopping task...");
            this.running = false;
            this.consumer.wakeup();
        }
    }

    public MuleConsumer(List<Consumer<?, ?>> list, ExecutorService executorService, int i, MuleContext muleContext) {
        this.consumers = list;
        this.executorService = executorService;
        this.muleContext = muleContext;
        this.partitions = i;
    }

    public void run(SourceCallback sourceCallback, String str) {
        subscribe(str);
        launchListeningTasks(sourceCallback, this.partitions, 1000L);
    }

    private void subscribe(String str) {
        Iterator<Consumer<?, ?>> it = this.consumers.iterator();
        while (it.hasNext()) {
            it.next().subscribe(Collections.singletonList(str));
        }
    }

    private void launchListeningTasks(SourceCallback sourceCallback, int i, long j) {
        logger.debug("Rising {} threads for processing messages.", Integer.valueOf(i));
        for (int i2 = 0; i2 < i; i2++) {
            ConsumerGroupTask consumerGroupTask = new ConsumerGroupTask(sourceCallback, j, this.consumers.get(i2));
            this.executorService.execute(consumerGroupTask);
            this.consumerGroupTasks.add(consumerGroupTask);
        }
    }

    public void shutdown() {
        stopRunningTasks();
        shutdownExecutorService();
        closeConsumer();
    }

    private void shutdownExecutorService() {
        if (this.executorService != null) {
            logger.debug("Shutting down executor service.");
            this.executorService.shutdown();
            logger.debug("Executor service shutted down.");
            try {
                logger.debug("Awaiting termination of executor service.");
                if (!this.executorService.awaitTermination(2147483647L, TimeUnit.MILLISECONDS)) {
                    logger.warn("Threadpool did not terminate cleanly.");
                }
                logger.debug("All threads terminated.");
            } catch (Exception e) {
                logger.error("Threadpool did not terminate cleanly.", e);
                this.muleContext.getExceptionListener().handleException(e);
                throw new UnableToShutdownMuleConsumerException("Unexpected exception while trying to shutdown.", e);
            }
        }
    }

    private void closeConsumer() {
        logger.debug("Closing Kafka consumers.");
        Iterator<Consumer<?, ?>> it = this.consumers.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        logger.debug("Kafka consumers closed.");
    }

    private void stopRunningTasks() {
        Iterator<ConsumerGroupTask> it = this.consumerGroupTasks.iterator();
        while (it.hasNext()) {
            it.next().stop();
        }
    }
}
