/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.CloseOptions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.SubscriptionPattern;
import org.apache.kafka.clients.consumer.internals.AbstractMembershipManager;
import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerInvoker;
import org.apache.kafka.clients.consumer.internals.ConsumerRebalanceListenerMethodName;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.CounterConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.Fetch;
import org.apache.kafka.clients.consumer.internals.FetchBuffer;
import org.apache.kafka.clients.consumer.internals.FetchCollector;
import org.apache.kafka.clients.consumer.internals.FetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.MemberStateListener;
import org.apache.kafka.clients.consumer.internals.OffsetAndTimestampInternal;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventHandler;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CheckAndUpdatePositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CommitOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.CreateFetchRequestsEvent;
import org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import org.apache.kafka.clients.consumer.internals.events.FetchCommittedOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.LeaveGroupOnCloseEvent;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetOffsetEvent;
import org.apache.kafka.clients.consumer.internals.events.SeekUnvalidatedEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicRe2JPatternSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent;
import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.InvalidGroupIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.MockConsumerInterceptor;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class AsyncKafkaConsumerTest {
    private AsyncKafkaConsumer<String, String> consumer = null;
    private Time time = new MockTime(0L);
    private final Metrics metrics = new Metrics();
    private final FetchCollector<String, String> fetchCollector = (FetchCollector)Mockito.mock(FetchCollector.class);
    private final ApplicationEventHandler applicationEventHandler = (ApplicationEventHandler)Mockito.mock(ApplicationEventHandler.class);
    private final ConsumerMetadata metadata = (ConsumerMetadata)Mockito.mock(ConsumerMetadata.class);
    private final LinkedBlockingQueue<BackgroundEvent> backgroundEventQueue = new LinkedBlockingQueue();
    private final CompletableEventReaper backgroundEventReaper = (CompletableEventReaper)Mockito.mock(CompletableEventReaper.class);

    @AfterEach
    public void resetAll() {
        this.backgroundEventQueue.clear();
        if (this.consumer != null) {
            try {
                this.consumer.close(CloseOptions.timeout((Duration)Duration.ZERO));
            }
            catch (Exception exception) {
                // empty catch block
            }
        }
        this.consumer = null;
        Mockito.framework().clearInlineMocks();
        MockConsumerInterceptor.resetCounters();
    }

    private AsyncKafkaConsumer<String, String> newConsumer() {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("group.id", "group-id");
        return this.newConsumer(props);
    }

    private AsyncKafkaConsumer<String, String> newConsumerWithoutGroupId() {
        Properties props = TestUtils.requiredConsumerConfig();
        return this.newConsumer(props);
    }

    private AsyncKafkaConsumer<String, String> newConsumer(Properties props) {
        return this.newConsumerWithStreamRebalanceData(props, null);
    }

    private AsyncKafkaConsumer<String, String> newConsumerWithStreamRebalanceData(Properties props, StreamsRebalanceData streamsRebalanceData) {
        if (!props.containsKey("enable.auto.commit")) {
            props.put("enable.auto.commit", (Object)false);
        }
        ConsumerConfig config = new ConsumerConfig(props);
        return new AsyncKafkaConsumer(config, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), this.time, (logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> this.applicationEventHandler, logContext -> this.backgroundEventReaper, (logContext, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time) -> this.fetchCollector, (consumerConfig, subscriptionState, logContext, clusterResourceListeners) -> this.metadata, this.backgroundEventQueue, Optional.ofNullable(streamsRebalanceData));
    }

    private AsyncKafkaConsumer<String, String> newConsumer(ConsumerConfig config) {
        return new AsyncKafkaConsumer(config, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), this.time, (logContext, time, applicationEventBlockingQueue, completableEventReaper, applicationEventProcessorSupplier, networkClientDelegateSupplier, requestManagersSupplier, asyncConsumerMetrics) -> this.applicationEventHandler, logContext -> this.backgroundEventReaper, (logContext, consumerMetadata, subscriptionState, fetchConfig, deserializers, fetchMetricsManager, time) -> this.fetchCollector, (consumerConfig, subscriptionState, logContext, clusterResourceListeners) -> this.metadata, this.backgroundEventQueue, Optional.empty());
    }

    private AsyncKafkaConsumer<String, String> newConsumer(FetchBuffer fetchBuffer, ConsumerInterceptors<String, String> interceptors, ConsumerRebalanceListenerInvoker rebalanceListenerInvoker, SubscriptionState subscriptions, String groupId, String clientId, boolean autoCommitEnabled) {
        long retryBackoffMs = 100L;
        int requestTimeoutMs = 30000;
        int defaultApiTimeoutMs = 1000;
        return new AsyncKafkaConsumer(new LogContext(), clientId, new Deserializers((Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), this.metrics), fetchBuffer, this.fetchCollector, interceptors, this.time, this.applicationEventHandler, this.backgroundEventQueue, this.backgroundEventReaper, rebalanceListenerInvoker, this.metrics, subscriptions, this.metadata, retryBackoffMs, requestTimeoutMs, defaultApiTimeoutMs, groupId, autoCommitEnabled);
    }

    @Test
    public void testSuccessfulStartupShutdown() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        Assertions.assertDoesNotThrow(() -> this.consumer.close());
    }

    @Test
    public void testFailOnClosedConsumer() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close();
        IllegalStateException res = (IllegalStateException)Assertions.assertThrows(IllegalStateException.class, () -> this.consumer.assignment());
        Assertions.assertEquals((Object)"This consumer has already been closed.", (Object)res.getMessage());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCommitAsyncWithNullCallback() {
        this.consumer = this.newConsumer();
        TopicPartition t0 = new TopicPartition("t0", 2);
        TopicPartition t1 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(t0, new OffsetAndMetadata(10L));
        offsets.put(t1, new OffsetAndMetadata(20L));
        this.markOffsetsReadyForCommitEvent();
        this.consumer.commitAsync(offsets, null);
        ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)commitEventCaptor.capture());
        AsyncCommitEvent commitEvent = (AsyncCommitEvent)commitEventCaptor.getValue();
        Assertions.assertTrue((boolean)commitEvent.offsets().isPresent());
        Assertions.assertEquals(offsets, commitEvent.offsets().get());
        commitEvent.future().complete(offsets);
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync((Map)offsets, null));
        try {
            Exception e = (Exception)Assertions.assertThrows(KafkaException.class, () -> this.consumer.close(CloseOptions.timeout((Duration)Duration.ZERO)));
            Assertions.assertInstanceOf(org.apache.kafka.common.errors.TimeoutException.class, (Object)e.getCause());
        }
        finally {
            this.consumer = null;
        }
    }

    @Test
    public void testCommitAsyncUserSuppliedCallbackNoException() {
        this.consumer = this.newConsumer();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L));
        this.completeCommitAsyncApplicationEventSuccessfully();
        MockCommitCallback callback = new MockCommitCallback();
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync(offsets, (OffsetCommitCallback)callback));
        this.forceCommitCallbackInvocation();
        Assertions.assertEquals((int)1, (int)callback.invoked);
        Assertions.assertNull((Object)callback.exception);
    }

    @ParameterizedTest
    @MethodSource(value={"commitExceptionSupplier"})
    public void testCommitAsyncUserSuppliedCallbackWithException(Exception exception) {
        this.consumer = this.newConsumer();
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(new TopicPartition("my-topic", 1), new OffsetAndMetadata(200L));
        this.completeCommitAsyncApplicationEventExceptionally(exception);
        MockCommitCallback callback = new MockCommitCallback();
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync(offsets, (OffsetCommitCallback)callback));
        this.forceCommitCallbackInvocation();
        Assertions.assertSame(exception.getClass(), callback.exception.getClass());
    }

    @Test
    public void testCommitAsyncShouldCopyOffsets() {
        this.consumer = this.newConsumer();
        TopicPartition tp = new TopicPartition("t0", 2);
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(tp, new OffsetAndMetadata(10L));
        this.markOffsetsReadyForCommitEvent();
        this.consumer.commitAsync(offsets, null);
        ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(AsyncCommitEvent.class);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)commitEventCaptor.capture());
        AsyncCommitEvent commitEvent = (AsyncCommitEvent)commitEventCaptor.getValue();
        Assertions.assertTrue((boolean)commitEvent.offsets().isPresent());
        Assertions.assertTrue((boolean)((Map)commitEvent.offsets().get()).containsKey(tp));
        offsets.remove(tp);
        Assertions.assertTrue((boolean)((Map)commitEvent.offsets().get()).containsKey(tp));
    }

    private static Stream<Exception> commitExceptionSupplier() {
        return Stream.of(new Exception[]{new KafkaException("Test exception"), new GroupAuthorizationException("Group authorization exception")});
    }

    @Test
    public void testCommitted() {
        this.time = new MockTime(1L);
        this.consumer = this.newConsumer();
        Map<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = this.mockTopicPartitionOffset();
        this.completeFetchedCommittedOffsetApplicationEventSuccessfully(topicPartitionOffsets);
        Assertions.assertEquals(topicPartitionOffsets, (Object)this.consumer.committed(topicPartitionOffsets.keySet(), Duration.ofMillis(1000L)));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
        Metric metric = (Metric)this.consumer.metrics().get(this.consumer.metricsRegistry().metricName("committed-time-ns-total", "consumer-metrics"));
        Assertions.assertTrue(((Double)metric.metricValue() > 0.0 ? 1 : 0) != 0);
    }

    @Test
    public void testCommittedExceptionThrown() {
        this.consumer = this.newConsumer();
        Map<TopicPartition, OffsetAndMetadata> offsets = this.mockTopicPartitionOffset();
        Mockito.when((Object)((Map)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class)))).thenAnswer(invocation -> {
            CompletableApplicationEvent event = (CompletableApplicationEvent)invocation.getArgument(0);
            Assertions.assertInstanceOf(FetchCommittedOffsetsEvent.class, (Object)event);
            throw new KafkaException("Test exception");
        });
        Assertions.assertThrows(KafkaException.class, () -> this.consumer.committed(offsets.keySet(), Duration.ofMillis(1000L)));
    }

    @Test
    public void testWakeupBeforeCallingPoll() {
        this.consumer = this.newConsumer();
        String topicName = "foo";
        int partition = 3;
        TopicPartition tp = new TopicPartition("foo", 3);
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Object)((Boolean)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class)))).thenReturn((Object)true);
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(tp));
        this.consumer.wakeup();
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        Assertions.assertThrows(WakeupException.class, () -> this.consumer.poll(Duration.ZERO));
        Assertions.assertDoesNotThrow(() -> this.consumer.poll(Duration.ZERO));
    }

    @Test
    public void testWakeupAfterEmptyFetch() {
        this.consumer = this.newConsumer();
        String topicName = "foo";
        int partition = 3;
        TopicPartition tp = new TopicPartition("foo", 3);
        ((FetchCollector)Mockito.doAnswer(invocation -> {
            this.consumer.wakeup();
            return Fetch.empty();
        }).doAnswer(invocation -> Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Object)((Boolean)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class)))).thenReturn((Object)true);
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(tp));
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        Assertions.assertThrows(WakeupException.class, () -> this.consumer.poll(Duration.ofMinutes(1L)));
        Assertions.assertDoesNotThrow(() -> this.consumer.poll(Duration.ZERO));
    }

    @Test
    public void testWakeupAfterNonEmptyFetch() {
        this.consumer = this.newConsumer();
        String topicName = "foo";
        int partition = 3;
        TopicPartition tp = new TopicPartition("foo", 3);
        List<ConsumerRecord> records = Arrays.asList(new ConsumerRecord("foo", 3, 2L, (Object)"key1", (Object)"value1"), new ConsumerRecord("foo", 3, 3L, (Object)"key2", (Object)"value2"));
        ((FetchCollector)Mockito.doAnswer(invocation -> {
            this.consumer.wakeup();
            return Fetch.forPartition((TopicPartition)tp, (List)records, (boolean)true, (OffsetAndMetadata)new OffsetAndMetadata(4L, Optional.of(0), ""));
        }).when(this.fetchCollector)).collectFetch((FetchBuffer)Mockito.any(FetchBuffer.class));
        Mockito.when((Object)((Boolean)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class)))).thenReturn((Object)true);
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(tp));
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        Assertions.assertDoesNotThrow(() -> this.consumer.poll(Duration.ofMinutes(1L)));
        Assertions.assertThrows(WakeupException.class, () -> this.consumer.poll(Duration.ZERO));
    }

    @Test
    public void testCommitInRebalanceCallback() {
        this.consumer = this.newConsumer();
        String topicName = "foo";
        int partition = 3;
        final TopicPartition tp = new TopicPartition("foo", 3);
        ((FetchCollector)Mockito.doAnswer(invocation -> Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)Mockito.any(FetchBuffer.class));
        Mockito.when((Object)((Boolean)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class)))).thenReturn((Object)true);
        TreeSet<TopicPartition> sortedPartitions = new TreeSet<TopicPartition>((Comparator<TopicPartition>)AbstractMembershipManager.TOPIC_PARTITION_COMPARATOR);
        sortedPartitions.add(tp);
        ConsumerRebalanceListenerCallbackNeededEvent e = new ConsumerRebalanceListenerCallbackNeededEvent(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, sortedPartitions);
        this.backgroundEventQueue.add((BackgroundEvent)e);
        this.completeCommitSyncApplicationEventSuccessfully();
        final AtomicBoolean callbackExecuted = new AtomicBoolean(false);
        ConsumerRebalanceListener listener = new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                Assertions.assertDoesNotThrow(() -> AsyncKafkaConsumerTest.this.consumer.commitSync(Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)tp, (Object)new OffsetAndMetadata(0L))})));
                callbackExecuted.set(true);
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            }
        };
        this.completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singletonList("foo"), listener);
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        this.consumer.poll(Duration.ZERO);
        Assertions.assertTrue((boolean)callbackExecuted.get());
    }

    @Test
    public void testClearWakeupTriggerAfterPoll() {
        this.consumer = this.newConsumer();
        String topicName = "foo";
        int partition = 3;
        TopicPartition tp = new TopicPartition("foo", 3);
        List<ConsumerRecord> records = Arrays.asList(new ConsumerRecord("foo", 3, 2L, (Object)"key1", (Object)"value1"), new ConsumerRecord("foo", 3, 3L, (Object)"key2", (Object)"value2"));
        ((FetchCollector)Mockito.doReturn((Object)Fetch.forPartition((TopicPartition)tp, records, (boolean)true, (OffsetAndMetadata)new OffsetAndMetadata(4L, Optional.of(0), ""))).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Object)((Boolean)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class)))).thenReturn((Object)true);
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(tp));
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        this.consumer.poll(Duration.ZERO);
        Assertions.assertDoesNotThrow(() -> this.consumer.poll(Duration.ZERO));
    }

    @Test
    public void testEnsureCallbackExecutedByApplicationThread() {
        this.consumer = this.newConsumer();
        String currentThread = Thread.currentThread().getName();
        MockCommitCallback callback = new MockCommitCallback();
        this.completeCommitAsyncApplicationEventSuccessfully();
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync(new HashMap(), (OffsetCommitCallback)callback));
        this.forceCommitCallbackInvocation();
        Assertions.assertEquals((int)1, (int)callback.invoked);
        Assertions.assertEquals((Object)currentThread, (Object)callback.completionThread);
    }

    @Test
    public void testEnsureCommitSyncExecutedCommitAsyncCallbacks() {
        this.consumer = this.newConsumer();
        KafkaException callbackException = new KafkaException("Async commit callback failed");
        OffsetCommitCallback callback = (offsets, exception) -> {
            throw callbackException;
        };
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync(new HashMap(), callback));
        Assertions.assertThrows(((Object)((Object)callbackException)).getClass(), () -> this.consumer.commitSync());
    }

    @Test
    public void testCommitSyncAwaitsCommitAsyncCompletionWithEmptyOffsets() {
        TopicPartition tp = new TopicPartition("foo", 0);
        CompletableFuture<Void> asyncCommitFuture = this.setUpConsumerWithIncompleteAsyncCommit(tp);
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100L)));
        asyncCommitFuture.completeExceptionally(new KafkaException("Test exception"));
        Assertions.assertDoesNotThrow(() -> this.consumer.commitSync(Collections.emptyMap(), Duration.ofMillis(100L)));
    }

    @Test
    public void testCommitSyncAwaitsCommitAsyncCompletionWithNonEmptyOffsets() {
        TopicPartition tp = new TopicPartition("foo", 0);
        CompletableFuture<Void> asyncCommitFuture = this.setUpConsumerWithIncompleteAsyncCommit(tp);
        this.completeCommitSyncApplicationEventSuccessfully();
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20L)), Duration.ofMillis(100L)));
        asyncCommitFuture.complete(null);
        Assertions.assertDoesNotThrow(() -> this.consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20L)), Duration.ofMillis(100L)));
    }

    @Test
    public void testCommitSyncAwaitsCommitAsyncButDoesNotFail() {
        TopicPartition tp = new TopicPartition("foo", 0);
        CompletableFuture<Void> asyncCommitFuture = this.setUpConsumerWithIncompleteAsyncCommit(tp);
        this.completeCommitSyncApplicationEventSuccessfully();
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20L)), Duration.ofMillis(100L)));
        asyncCommitFuture.completeExceptionally(new KafkaException("Test exception"));
        Assertions.assertDoesNotThrow(() -> this.consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(20L)), Duration.ofMillis(100L)));
    }

    @Test
    public void testCommitSyncShouldCopyOffsets() {
        this.consumer = this.newConsumer();
        TopicPartition tp = new TopicPartition("t0", 2);
        HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        offsets.put(tp, new OffsetAndMetadata(10L));
        this.completeCommitSyncApplicationEventSuccessfully();
        this.consumer.commitSync(offsets);
        ArgumentCaptor commitEventCaptor = ArgumentCaptor.forClass(SyncCommitEvent.class);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)commitEventCaptor.capture());
        SyncCommitEvent commitEvent = (SyncCommitEvent)commitEventCaptor.getValue();
        Assertions.assertTrue((boolean)commitEvent.offsets().isPresent());
        Assertions.assertTrue((boolean)((Map)commitEvent.offsets().get()).containsKey(tp));
        offsets.remove(tp);
        Assertions.assertTrue((boolean)((Map)commitEvent.offsets().get()).containsKey(tp));
    }

    private CompletableFuture<Void> setUpConsumerWithIncompleteAsyncCommit(TopicPartition tp) {
        this.time = new MockTime(1L);
        this.consumer = this.newConsumer();
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(tp));
        this.completeSeekUnvalidatedEventSuccessfully();
        this.consumer.seek(tp, 20L);
        this.markOffsetsReadyForCommitEvent();
        this.consumer.commitAsync();
        CompletableApplicationEvent event = this.getLastEnqueuedEvent();
        return event.future();
    }

    private <T> CompletableApplicationEvent<T> getLastEnqueuedEvent() {
        ArgumentCaptor eventArgumentCaptor = ArgumentCaptor.forClass(CompletableApplicationEvent.class);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.atLeast((int)1))).add((ApplicationEvent)eventArgumentCaptor.capture());
        List allValues = eventArgumentCaptor.getAllValues();
        return (CompletableApplicationEvent)allValues.get(allValues.size() - 1);
    }

    private <T> CompletableApplicationEvent<T> addAndGetLastEnqueuedEvent() {
        ArgumentCaptor eventArgumentCaptor = ArgumentCaptor.forClass(CompletableApplicationEvent.class);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.atLeast((int)1))).addAndGet((CompletableApplicationEvent)eventArgumentCaptor.capture());
        List allValues = eventArgumentCaptor.getAllValues();
        return (CompletableApplicationEvent)allValues.get(allValues.size() - 1);
    }

    @Test
    public void testEnsurePollExecutedCommitAsyncCallbacks() {
        this.consumer = this.newConsumer();
        MockCommitCallback callback = new MockCommitCallback();
        this.completeCommitAsyncApplicationEventSuccessfully();
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Object)((Boolean)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class)))).thenReturn((Object)true);
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(new TopicPartition("foo", 0)));
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync(new HashMap(), (OffsetCommitCallback)callback));
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        this.assertMockCommitCallbackInvoked(() -> this.consumer.poll(Duration.ZERO), callback);
    }

    @Test
    public void testEnsureShutdownExecutedCommitAsyncCallbacks() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        MockCommitCallback callback = new MockCommitCallback();
        this.completeCommitAsyncApplicationEventSuccessfully();
        Assertions.assertDoesNotThrow(() -> this.consumer.commitAsync(new HashMap(), (OffsetCommitCallback)callback));
        this.assertMockCommitCallbackInvoked(() -> this.consumer.close(), callback);
    }

    @Test
    public void testVerifyApplicationEventOnShutdown() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        ((ApplicationEventHandler)Mockito.doReturn(null).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any());
        this.consumer.close();
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(CommitOnCloseEvent.class));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(LeaveGroupOnCloseEvent.class));
    }

    @ParameterizedTest
    @ValueSource(longs={0L, 30000L})
    public void testCloseLeavesGroup(long timeoutMs) {
        SubscriptionState subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        this.consumer = (AsyncKafkaConsumer)Mockito.spy(this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)((ConsumerInterceptors)Mockito.mock(ConsumerInterceptors.class)), (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", "client-id", false));
        this.consumer.close(CloseOptions.timeout((Duration)Duration.ofMillis(timeoutMs)));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(LeaveGroupOnCloseEvent.class));
    }

    @Test
    public void testCloseLeavesGroupDespiteOnPartitionsLostError() {
        KafkaException rootError = new KafkaException("Intentional error");
        Set<TopicPartition> partitions = Collections.singleton(new TopicPartition("topic1", 0));
        SubscriptionState subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        Mockito.when((Object)subscriptions.assignedPartitions()).thenReturn(partitions);
        ConsumerRebalanceListenerInvoker invoker = (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class);
        ((ConsumerRebalanceListenerInvoker)Mockito.doAnswer(invocation -> rootError).when((Object)invoker)).invokePartitionsLost((SortedSet)ArgumentMatchers.any(SortedSet.class));
        this.consumer = (AsyncKafkaConsumer)Mockito.spy(this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)new ConsumerInterceptors(Collections.emptyList(), this.metrics), invoker, subscriptions, "group-id", "client-id", false));
        this.consumer.setGroupAssignmentSnapshot(partitions);
        Throwable t = Assertions.assertThrows(KafkaException.class, () -> this.consumer.close(CloseOptions.timeout((Duration)Duration.ZERO)));
        Assertions.assertNotNull((Object)t.getCause());
        Assertions.assertEquals((Object)((Object)rootError), (Object)t.getCause());
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(LeaveGroupOnCloseEvent.class));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ParameterizedTest
    @ValueSource(longs={0L, 30000L})
    public void testCloseLeavesGroupDespiteInterrupt(long timeoutMs) {
        Set<TopicPartition> partitions = Collections.singleton(new TopicPartition("topic1", 0));
        SubscriptionState subscriptions = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        Mockito.when((Object)subscriptions.assignedPartitions()).thenReturn(partitions);
        Mockito.when((Object)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CompletableApplicationEvent.class))).thenThrow(InterruptException.class);
        this.consumer = (AsyncKafkaConsumer)Mockito.spy(this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)((ConsumerInterceptors)Mockito.mock(ConsumerInterceptors.class)), (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", "client-id", false));
        Duration timeout = Duration.ofMillis(timeoutMs);
        try {
            Assertions.assertThrows(InterruptException.class, () -> this.consumer.close(CloseOptions.timeout((Duration)timeout)));
        }
        finally {
            Thread.interrupted();
        }
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(CommitOnCloseEvent.class));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(LeaveGroupOnCloseEvent.class));
    }

    @Test
    public void testCommitSyncAllConsumed() {
        SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)((ConsumerInterceptors)Mockito.mock(ConsumerInterceptors.class)), (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", "client-id", false);
        this.completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singleton("topic"), (ConsumerRebalanceListener)Mockito.mock(ConsumerRebalanceListener.class));
        subscriptions.assignFromSubscribed(Collections.singleton(new TopicPartition("topic", 0)));
        this.completeSeekUnvalidatedEventSuccessfully();
        subscriptions.seek(new TopicPartition("topic", 0), 100L);
        this.markOffsetsReadyForCommitEvent();
        this.consumer.commitSyncAllConsumed(this.time.timer(100L));
        ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(SyncCommitEvent.class);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)eventCaptor.capture());
        SyncCommitEvent capturedEvent = (SyncCommitEvent)eventCaptor.getValue();
        Assertions.assertFalse((boolean)capturedEvent.offsets().isPresent(), (String)"Expected empty optional offsets");
    }

    @Test
    public void testAutoCommitSyncDisabled() {
        SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)((ConsumerInterceptors)Mockito.mock(ConsumerInterceptors.class)), (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", "client-id", false);
        this.completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singleton("topic"), (ConsumerRebalanceListener)Mockito.mock(ConsumerRebalanceListener.class));
        subscriptions.assignFromSubscribed(Collections.singleton(new TopicPartition("topic", 0)));
        this.completeSeekUnvalidatedEventSuccessfully();
        subscriptions.seek(new TopicPartition("topic", 0), 100L);
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close();
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.never())).add((ApplicationEvent)ArgumentMatchers.any(SyncCommitEvent.class));
    }

    private void assertMockCommitCallbackInvoked(Executable task, MockCommitCallback callback) {
        Assertions.assertDoesNotThrow((Executable)task);
        Assertions.assertEquals((int)1, (int)callback.invoked);
        Assertions.assertNull((Object)callback.exception);
    }

    @Test
    public void testAssign() {
        this.consumer = this.newConsumer();
        TopicPartition tp = new TopicPartition("foo", 3);
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(tp));
        Assertions.assertTrue((boolean)this.consumer.subscription().isEmpty());
        Assertions.assertTrue((boolean)this.consumer.assignment().contains(tp));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(AssignmentChangeEvent.class));
    }

    @Test
    public void testAssignOnNullTopicPartition() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.assign(null));
    }

    @Test
    public void testAssignOnEmptyTopicPartition() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.assign(Collections.emptyList());
        Assertions.assertTrue((boolean)this.consumer.subscription().isEmpty());
        Assertions.assertTrue((boolean)this.consumer.assignment().isEmpty());
    }

    @Test
    public void testAssignOnNullTopicInPartition() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.assign(Collections.singleton(new TopicPartition(null, 0))));
    }

    @Test
    public void testAssignOnEmptyTopicInPartition() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.assign(Collections.singleton(new TopicPartition("  ", 0))));
    }

    @Test
    public void testBeginningOffsetsFailsIfNullPartitions() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(NullPointerException.class, () -> this.consumer.beginningOffsets(null, Duration.ofMillis(1L)));
    }

    @Test
    public void testBeginningOffsets() {
        this.consumer = this.newConsumer();
        Map<TopicPartition, OffsetAndTimestampInternal> expectedOffsets = this.mockOffsetAndTimestamp();
        Mockito.when((Object)((Map)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(ListOffsetsEvent.class)))).thenAnswer(invocation -> {
            ListOffsetsEvent event = (ListOffsetsEvent)invocation.getArgument(0);
            Timer timer = this.time.timer(event.deadlineMs() - this.time.milliseconds());
            if (timer.remainingMs() == 0L) {
                Assertions.fail((String)"Timer duration should not be zero.");
            }
            return expectedOffsets;
        });
        Map result = (Map)Assertions.assertDoesNotThrow(() -> this.consumer.beginningOffsets(expectedOffsets.keySet(), Duration.ofMillis(1L)));
        expectedOffsets.forEach((key, value) -> {
            Assertions.assertTrue((boolean)result.containsKey(key));
            Assertions.assertEquals((long)value.offset(), (Long)((Long)result.get(key)));
        });
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(ListOffsetsEvent.class));
    }

    @Test
    public void testBeginningOffsetsThrowsKafkaExceptionForUnderlyingExecutionFailure() {
        this.consumer = this.newConsumer();
        Set<TopicPartition> partitions = this.mockTopicPartitionOffset().keySet();
        KafkaException eventProcessingFailure = new KafkaException("Unexpected failure processing List Offsets event");
        ((ApplicationEventHandler)Mockito.doThrow((Throwable[])new Throwable[]{eventProcessingFailure}).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(ListOffsetsEvent.class));
        Throwable consumerError = Assertions.assertThrows(KafkaException.class, () -> this.consumer.beginningOffsets((Collection)partitions, Duration.ofMillis(1L)));
        Assertions.assertEquals((Object)((Object)eventProcessingFailure), (Object)consumerError);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testBeginningOffsetsTimeoutOnEventProcessingTimeout() {
        this.consumer = this.newConsumer();
        ((ApplicationEventHandler)Mockito.doThrow((Throwable[])new Throwable[]{new org.apache.kafka.common.errors.TimeoutException()}).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any());
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.beginningOffsets(Collections.singletonList(new TopicPartition("t1", 0)), Duration.ofMillis(1L)));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testOffsetsForTimesOnNullPartitions() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(NullPointerException.class, () -> this.consumer.offsetsForTimes(null, Duration.ofMillis(1L)));
    }

    @Test
    public void testOffsetsForTimesFailsOnNegativeTargetTimes() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition("topic1", 1), -2L), Duration.ofMillis(1L)));
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition("topic1", 1), -1L), Duration.ofMillis(1L)));
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.offsetsForTimes(Collections.singletonMap(new TopicPartition("topic1", 1), -3L), Duration.ofMillis(1L)));
    }

    @Test
    public void testOffsetsForTimes() {
        this.consumer = this.newConsumer();
        Map<TopicPartition, OffsetAndTimestampInternal> expectedResult = this.mockOffsetAndTimestamp();
        Map<TopicPartition, Long> timestampToSearch = this.mockTimestampToSearch();
        ((ApplicationEventHandler)Mockito.doReturn(expectedResult).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any());
        Map result = (Map)Assertions.assertDoesNotThrow(() -> this.consumer.offsetsForTimes(timestampToSearch, Duration.ofMillis(1L)));
        expectedResult.forEach((key, value) -> {
            OffsetAndTimestamp expected = value.buildOffsetAndTimestamp();
            Assertions.assertEquals((Object)expected, result.get(key));
        });
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testOffsetsForTimesTimeoutException() {
        this.consumer = this.newConsumer();
        long timeout = 100L;
        ((ApplicationEventHandler)Mockito.doThrow((Throwable[])new Throwable[]{new org.apache.kafka.common.errors.TimeoutException("Event did not complete in time and was expired by the reaper")}).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any());
        Throwable t = Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.offsetsForTimes(this.mockTimestampToSearch(), Duration.ofMillis(timeout)));
        Assertions.assertEquals((Object)("Failed to get offsets by times in " + timeout + "ms"), (Object)t.getMessage());
    }

    @Test
    public void testBeginningOffsetsTimeoutException() {
        this.consumer = this.newConsumer();
        long timeout = 100L;
        ((ApplicationEventHandler)Mockito.doThrow((Throwable[])new Throwable[]{new org.apache.kafka.common.errors.TimeoutException("Event did not complete in time and was expired by the reaper")}).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any());
        Throwable t = Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.beginningOffsets(Collections.singleton(new TopicPartition("topic", 5)), Duration.ofMillis(timeout)));
        Assertions.assertEquals((Object)("Failed to get offsets by times in " + timeout + "ms"), (Object)t.getMessage());
    }

    @Test
    public void testEndOffsetsTimeoutException() {
        this.consumer = this.newConsumer();
        long timeout = 100L;
        ((ApplicationEventHandler)Mockito.doThrow((Throwable[])new Throwable[]{new org.apache.kafka.common.errors.TimeoutException("Event did not complete in time and was expired by the reaper")}).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any());
        Throwable t = Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.endOffsets(Collections.singleton(new TopicPartition("topic", 5)), Duration.ofMillis(timeout)));
        Assertions.assertEquals((Object)("Failed to get offsets by times in " + timeout + "ms"), (Object)t.getMessage());
    }

    @Test
    public void testBeginningOffsetsWithZeroTimeout() {
        this.consumer = this.newConsumer();
        TopicPartition tp = new TopicPartition("topic1", 0);
        Map result = (Map)Assertions.assertDoesNotThrow(() -> this.consumer.beginningOffsets(Collections.singletonList(tp), Duration.ZERO));
        Assertions.assertNotNull((Object)result);
        Assertions.assertEquals((int)0, (int)result.size());
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testOffsetsForTimesWithZeroTimeout() {
        this.consumer = this.newConsumer();
        TopicPartition tp = new TopicPartition("topic1", 0);
        Map<TopicPartition, Object> expectedResult = Collections.singletonMap(tp, null);
        Map<TopicPartition, Long> timestampToSearch = Collections.singletonMap(tp, 5L);
        Map result = (Map)Assertions.assertDoesNotThrow(() -> this.consumer.offsetsForTimes(timestampToSearch, Duration.ZERO));
        Assertions.assertEquals(expectedResult, (Object)result);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.never())).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ListOffsetsEvent.class));
    }

    @Test
    public void testWakeupCommitted() {
        this.consumer = this.newConsumer();
        Map<TopicPartition, OffsetAndMetadata> offsets = this.mockTopicPartitionOffset();
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            CompletableApplicationEvent event = (CompletableApplicationEvent)invocation.getArgument(0);
            Assertions.assertInstanceOf(FetchCommittedOffsetsEvent.class, (Object)event);
            Assertions.assertTrue((boolean)event.future().isCompletedExceptionally());
            return ConsumerUtils.getResult((Future)event.future());
        }).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        this.consumer.wakeup();
        Assertions.assertThrows(WakeupException.class, () -> this.consumer.committed(offsets.keySet()));
        Assertions.assertNull((Object)this.consumer.wakeupTrigger().getPendingTask());
    }

    @Test
    public void testNoWakeupInCloseCommit() {
        TopicPartition tp = new TopicPartition("topic1", 0);
        Properties props = this.requiredConsumerConfigAndGroupId("consumer-group");
        props.put("enable.auto.commit", (Object)true);
        this.consumer = this.newConsumer(props);
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(tp));
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.completeSeekUnvalidatedEventSuccessfully();
        this.consumer.seek(tp, 10L);
        this.consumer.wakeup();
        AtomicReference capturedEvent = new AtomicReference();
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            ApplicationEvent event = (ApplicationEvent)invocation.getArgument(0);
            if (event instanceof SyncCommitEvent) {
                capturedEvent.set((SyncCommitEvent)event);
                ((SyncCommitEvent)event).markOffsetsReady();
            }
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any());
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close(CloseOptions.timeout((Duration)Duration.ZERO));
        Assertions.assertNotNull(capturedEvent.get());
        Assertions.assertFalse((boolean)((SyncCommitEvent)capturedEvent.get()).future().isCompletedExceptionally());
    }

    @Test
    public void testCloseAwaitPendingAsyncCommitIncomplete() {
        this.time = new MockTime(1L);
        this.consumer = this.newConsumer();
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        TopicPartition tp = new TopicPartition("foo", 0);
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(tp));
        this.completeSeekUnvalidatedEventSuccessfully();
        this.consumer.seek(tp, 20L);
        this.markOffsetsReadyForCommitEvent();
        this.consumer.commitAsync();
        Exception e = (Exception)Assertions.assertThrows(KafkaException.class, () -> this.consumer.close(CloseOptions.timeout((Duration)Duration.ofMillis(10L))));
        Assertions.assertInstanceOf(org.apache.kafka.common.errors.TimeoutException.class, (Object)e.getCause());
    }

    @Test
    public void testCloseAwaitPendingAsyncCommitComplete() {
        this.time = new MockTime(1L);
        this.consumer = this.newConsumer();
        MockCommitCallback cb = new MockCommitCallback();
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        TopicPartition tp = new TopicPartition("foo", 0);
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(tp));
        this.completeSeekUnvalidatedEventSuccessfully();
        this.consumer.seek(tp, 20L);
        this.completeCommitAsyncApplicationEventSuccessfully();
        this.consumer.commitAsync((OffsetCommitCallback)cb);
        this.completeUnsubscribeApplicationEventSuccessfully();
        Assertions.assertDoesNotThrow(() -> this.consumer.close(CloseOptions.timeout((Duration)Duration.ofMillis(10L))));
        Assertions.assertEquals((int)1, (int)cb.invoked);
    }

    @Test
    public void testInterceptorAutoCommitOnClose() {
        Properties props = this.requiredConsumerConfigAndGroupId("test-id");
        props.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        props.setProperty("enable.auto.commit", "true");
        this.consumer = this.newConsumer(props);
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.INIT_COUNT.get());
        this.completeCommitSyncApplicationEventSuccessfully();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close(CloseOptions.timeout((Duration)Duration.ZERO));
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.CLOSE_COUNT.get());
    }

    @Test
    public void testInterceptorCommitSync() {
        Properties props = this.requiredConsumerConfigAndGroupId("test-id");
        props.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        props.setProperty("enable.auto.commit", "false");
        this.consumer = this.newConsumer(props);
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.INIT_COUNT.get());
        this.completeCommitSyncApplicationEventSuccessfully();
        this.consumer.commitSync(this.mockTopicPartitionOffset());
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
    }

    @Test
    public void testNoInterceptorCommitSyncFailed() {
        Properties props = this.requiredConsumerConfigAndGroupId("test-id");
        props.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        props.setProperty("enable.auto.commit", "false");
        this.consumer = this.newConsumer(props);
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.INIT_COUNT.get());
        KafkaException expected = new KafkaException("Test exception");
        this.completeCommitSyncApplicationEventExceptionally((Exception)((Object)expected));
        KafkaException actual = (KafkaException)Assertions.assertThrows(KafkaException.class, () -> this.consumer.commitSync(this.mockTopicPartitionOffset()));
        Assertions.assertEquals((Object)((Object)expected), (Object)((Object)actual));
        Assertions.assertEquals((int)0, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
    }

    @Test
    public void testInterceptorCommitAsync() {
        Properties props = this.requiredConsumerConfigAndGroupId("test-id");
        props.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        props.setProperty("enable.auto.commit", "false");
        this.consumer = this.newConsumer(props);
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.INIT_COUNT.get());
        this.completeCommitAsyncApplicationEventSuccessfully();
        this.consumer.commitAsync(this.mockTopicPartitionOffset(), (OffsetCommitCallback)new MockCommitCallback());
        Assertions.assertEquals((int)0, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
        this.forceCommitCallbackInvocation();
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
    }

    @Test
    public void testNoInterceptorCommitAsyncFailed() {
        Properties props = this.requiredConsumerConfigAndGroupId("test-id");
        props.setProperty("interceptor.classes", MockConsumerInterceptor.class.getName());
        props.setProperty("enable.auto.commit", "false");
        this.consumer = this.newConsumer(props);
        Assertions.assertEquals((int)1, (int)MockConsumerInterceptor.INIT_COUNT.get());
        this.completeCommitAsyncApplicationEventExceptionally((Exception)((Object)new KafkaException("Test exception")));
        this.consumer.commitAsync(this.mockTopicPartitionOffset(), (OffsetCommitCallback)new MockCommitCallback());
        Assertions.assertEquals((int)0, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
        this.forceCommitCallbackInvocation();
        Assertions.assertEquals((int)0, (int)MockConsumerInterceptor.ON_COMMIT_COUNT.get());
    }

    @Test
    public void testRefreshCommittedOffsetsShouldNotResetIfFailedWithTimeout() {
        this.consumer = this.newConsumer();
        this.testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout();
    }

    @Test
    public void testRefreshCommittedOffsetsNotCalledIfNoGroupId() {
        this.consumer = this.newConsumerWithoutGroupId();
        this.testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout();
    }

    @Test
    public void testSubscribeGeneratesEvent() {
        this.consumer = this.newConsumer();
        String topic = "topic1";
        this.completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singletonList(topic));
        Assertions.assertEquals(Collections.singleton(topic), (Object)this.consumer.subscription());
        Assertions.assertTrue((boolean)this.consumer.assignment().isEmpty());
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(TopicSubscriptionChangeEvent.class));
    }

    @Test
    public void testSubscribePatternGeneratesEvent() {
        this.consumer = this.newConsumer();
        Pattern pattern = Pattern.compile("topic.*");
        this.completeTopicPatternSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(pattern);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(TopicPatternSubscriptionChangeEvent.class));
    }

    @Test
    public void testUnsubscribeGeneratesUnsubscribeEvent() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.unsubscribe();
        Assertions.assertTrue((boolean)this.consumer.subscription().isEmpty());
        Assertions.assertTrue((boolean)this.consumer.assignment().isEmpty());
        ArgumentCaptor eventCaptor = ArgumentCaptor.forClass(UnsubscribeEvent.class);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)eventCaptor.capture());
        long deadline = this.time.milliseconds() + (long)((Integer)ConsumerConfig.configDef().defaultValues().get("default.api.timeout.ms")).intValue();
        Assertions.assertTrue((((UnsubscribeEvent)eventCaptor.getValue()).deadlineMs() <= deadline ? 1 : 0) != 0);
    }

    @Test
    public void testSubscribeToEmptyListActsAsUnsubscribe() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.subscribe(Collections.emptyList());
        Assertions.assertTrue((boolean)this.consumer.subscription().isEmpty());
        Assertions.assertTrue((boolean)this.consumer.assignment().isEmpty());
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(UnsubscribeEvent.class));
    }

    @Test
    public void testSubscribeToNullTopicCollection() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.subscribe((Collection)null));
    }

    @Test
    public void testSubscriptionOnNullTopic() {
        this.consumer = this.newConsumer();
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.subscribe(Collections.singletonList(null)));
    }

    @Test
    public void testSubscriptionOnEmptyTopic() {
        this.consumer = this.newConsumer();
        String emptyTopic = "  ";
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.subscribe(Collections.singletonList(emptyTopic)));
    }

    @Test
    public void testGroupMetadataAfterCreationWithGroupIdIsNull() {
        Properties props = TestUtils.requiredConsumerConfig();
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        Assertions.assertFalse((boolean)config.unused().contains("auto.commit.interval.ms"));
        Assertions.assertFalse((boolean)config.unused().contains("internal.throw.on.fetch.stable.offset.unsupported"));
        Throwable exception = Assertions.assertThrows(InvalidGroupIdException.class, () -> this.consumer.groupMetadata());
        Assertions.assertEquals((Object)"To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.", (Object)exception.getMessage());
    }

    @Test
    public void testGroupMetadataAfterCreationWithGroupIdIsNotNull() {
        String groupId = "consumerGroupA";
        this.consumer = this.newConsumer(this.requiredConsumerConfigAndGroupId("consumerGroupA"));
        ConsumerGroupMetadata groupMetadata = this.consumer.groupMetadata();
        Assertions.assertEquals((Object)"consumerGroupA", (Object)groupMetadata.groupId());
        Assertions.assertEquals(Optional.empty(), (Object)groupMetadata.groupInstanceId());
        Assertions.assertEquals((int)-1, (int)groupMetadata.generationId());
        Assertions.assertEquals((Object)"", (Object)groupMetadata.memberId());
    }

    @Test
    public void testGroupMetadataAfterCreationWithGroupIdIsNotNullAndGroupInstanceIdSet() {
        String groupId = "consumerGroupA";
        String groupInstanceId = "groupInstanceId1";
        Properties props = this.requiredConsumerConfigAndGroupId("consumerGroupA");
        props.put("group.instance.id", "groupInstanceId1");
        this.consumer = this.newConsumer(props);
        ConsumerGroupMetadata groupMetadata = this.consumer.groupMetadata();
        Assertions.assertEquals((Object)"consumerGroupA", (Object)groupMetadata.groupId());
        Assertions.assertEquals(Optional.of("groupInstanceId1"), (Object)groupMetadata.groupInstanceId());
        Assertions.assertEquals((int)-1, (int)groupMetadata.generationId());
        Assertions.assertEquals((Object)"", (Object)groupMetadata.memberId());
    }

    private MemberStateListener captureGroupMetadataUpdateListener(MockedStatic<RequestManagers> requestManagers) {
        ArgumentCaptor applicationThreadMemberStateListener = ArgumentCaptor.forClass(MemberStateListener.class);
        requestManagers.verify(() -> RequestManagers.supplier((Time)((Time)ArgumentMatchers.any()), (LogContext)((LogContext)ArgumentMatchers.any()), (BackgroundEventHandler)((BackgroundEventHandler)ArgumentMatchers.any()), (ConsumerMetadata)((ConsumerMetadata)ArgumentMatchers.any()), (SubscriptionState)((SubscriptionState)ArgumentMatchers.any()), (FetchBuffer)((FetchBuffer)ArgumentMatchers.any()), (ConsumerConfig)((ConsumerConfig)ArgumentMatchers.any()), (GroupRebalanceConfig)((GroupRebalanceConfig)ArgumentMatchers.any()), (ApiVersions)((ApiVersions)ArgumentMatchers.any()), (FetchMetricsManager)((FetchMetricsManager)ArgumentMatchers.any()), (Supplier)((Supplier)ArgumentMatchers.any()), (Optional)((Optional)ArgumentMatchers.any()), (Metrics)((Metrics)ArgumentMatchers.any()), (OffsetCommitCallbackInvoker)((OffsetCommitCallbackInvoker)ArgumentMatchers.any()), (MemberStateListener)((MemberStateListener)applicationThreadMemberStateListener.capture()), (Optional)((Optional)ArgumentMatchers.any())));
        return (MemberStateListener)applicationThreadMemberStateListener.getValue();
    }

    @Test
    public void testGroupMetadataUpdate() {
        String groupId = "consumerGroupA";
        try (MockedStatic requestManagers = Mockito.mockStatic(RequestManagers.class);){
            this.consumer = this.newConsumer(this.requiredConsumerConfigAndGroupId("consumerGroupA"));
            ConsumerGroupMetadata oldGroupMetadata = this.consumer.groupMetadata();
            MemberStateListener groupMetadataUpdateListener = this.captureGroupMetadataUpdateListener((MockedStatic<RequestManagers>)requestManagers);
            int expectedMemberEpoch = 42;
            String expectedMemberId = "memberId";
            groupMetadataUpdateListener.onMemberEpochUpdated(Optional.of(42), "memberId");
            ConsumerGroupMetadata newGroupMetadata = this.consumer.groupMetadata();
            Assertions.assertEquals((Object)oldGroupMetadata.groupId(), (Object)newGroupMetadata.groupId());
            Assertions.assertEquals((Object)"memberId", (Object)newGroupMetadata.memberId());
            Assertions.assertEquals((int)42, (int)newGroupMetadata.generationId());
            Assertions.assertEquals((Object)oldGroupMetadata.groupInstanceId(), (Object)newGroupMetadata.groupInstanceId());
        }
    }

    @Test
    public void testGroupMetadataIsResetAfterUnsubscribe() {
        String groupId = "consumerGroupA";
        try (MockedStatic requestManagers = Mockito.mockStatic(RequestManagers.class);){
            this.consumer = this.newConsumer(this.requiredConsumerConfigAndGroupId("consumerGroupA"));
            MemberStateListener groupMetadataUpdateListener = this.captureGroupMetadataUpdateListener((MockedStatic<RequestManagers>)requestManagers);
            this.consumer.subscribe(Collections.singletonList("topic"));
            int memberEpoch = 42;
            String memberId = "memberId";
            groupMetadataUpdateListener.onMemberEpochUpdated(Optional.of(42), "memberId");
            ConsumerGroupMetadata groupMetadata = this.consumer.groupMetadata();
            Assertions.assertNotEquals((int)-1, (int)groupMetadata.generationId());
            Assertions.assertNotEquals((Object)"", (Object)groupMetadata.memberId());
        }
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.unsubscribe();
        ConsumerGroupMetadata groupMetadataAfterUnsubscribe = new ConsumerGroupMetadata("consumerGroupA", -1, "", Optional.empty());
        Assertions.assertEquals((Object)groupMetadataAfterUnsubscribe, (Object)this.consumer.groupMetadata());
    }

    private Optional<StreamsRebalanceData> captureStreamRebalanceData(MockedStatic<RequestManagers> requestManagers) {
        ArgumentCaptor streamRebalanceData = ArgumentCaptor.forClass(Optional.class);
        requestManagers.verify(() -> RequestManagers.supplier((Time)((Time)ArgumentMatchers.any()), (LogContext)((LogContext)ArgumentMatchers.any()), (BackgroundEventHandler)((BackgroundEventHandler)ArgumentMatchers.any()), (ConsumerMetadata)((ConsumerMetadata)ArgumentMatchers.any()), (SubscriptionState)((SubscriptionState)ArgumentMatchers.any()), (FetchBuffer)((FetchBuffer)ArgumentMatchers.any()), (ConsumerConfig)((ConsumerConfig)ArgumentMatchers.any()), (GroupRebalanceConfig)((GroupRebalanceConfig)ArgumentMatchers.any()), (ApiVersions)((ApiVersions)ArgumentMatchers.any()), (FetchMetricsManager)((FetchMetricsManager)ArgumentMatchers.any()), (Supplier)((Supplier)ArgumentMatchers.any()), (Optional)((Optional)ArgumentMatchers.any()), (Metrics)((Metrics)ArgumentMatchers.any()), (OffsetCommitCallbackInvoker)((OffsetCommitCallbackInvoker)ArgumentMatchers.any()), (MemberStateListener)((MemberStateListener)ArgumentMatchers.any()), (Optional)((Optional)streamRebalanceData.capture())));
        return (Optional)streamRebalanceData.getValue();
    }

    @Test
    public void testEmptyStreamRebalanceData() {
        String groupId = "consumerGroupA";
        try (MockedStatic requestManagers = Mockito.mockStatic(RequestManagers.class);){
            this.consumer = this.newConsumer(this.requiredConsumerConfigAndGroupId("consumerGroupA"));
            Optional<StreamsRebalanceData> groupMetadataUpdateListener = this.captureStreamRebalanceData((MockedStatic<RequestManagers>)requestManagers);
            Assertions.assertTrue((boolean)groupMetadataUpdateListener.isEmpty());
        }
    }

    @Test
    public void testStreamRebalanceData() {
        String groupId = "consumerGroupA";
        try (MockedStatic requestManagers = Mockito.mockStatic(RequestManagers.class);){
            StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());
            this.consumer = this.newConsumerWithStreamRebalanceData(this.requiredConsumerConfigAndGroupId("consumerGroupA"), streamsRebalanceData);
            Optional<StreamsRebalanceData> groupMetadataUpdateListener = this.captureStreamRebalanceData((MockedStatic<RequestManagers>)requestManagers);
            Assertions.assertTrue((boolean)groupMetadataUpdateListener.isPresent());
            Assertions.assertEquals((Object)streamsRebalanceData, (Object)groupMetadataUpdateListener.get());
        }
    }

    @ParameterizedTest
    @MethodSource(value={"listenerCallbacksInvokeSource"})
    public void testListenerCallbacksInvoke(List<ConsumerRebalanceListenerMethodName> methodNames, Optional<RuntimeException> revokedError, Optional<RuntimeException> assignedError, Optional<RuntimeException> lostError, int expectedRevokedCount, int expectedAssignedCount, int expectedLostCount, Optional<RuntimeException> expectedException) {
        this.consumer = this.newConsumer();
        CounterConsumerRebalanceListener consumerRebalanceListener = new CounterConsumerRebalanceListener(revokedError, assignedError, lostError);
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singletonList("topic"), (ConsumerRebalanceListener)consumerRebalanceListener);
        SortedSet partitions = Collections.emptySortedSet();
        for (ConsumerRebalanceListenerMethodName methodName : methodNames) {
            ConsumerRebalanceListenerCallbackNeededEvent e = new ConsumerRebalanceListenerCallbackNeededEvent(methodName, partitions);
            this.backgroundEventQueue.add((BackgroundEvent)e);
        }
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        if (expectedException.isPresent()) {
            Exception exception = (Exception)Assertions.assertThrows(expectedException.get().getClass(), () -> this.consumer.poll(Duration.ZERO));
            Assertions.assertEquals((Object)expectedException.get().getMessage(), (Object)exception.getMessage());
            Assertions.assertEquals((Object)expectedException.get().getCause(), (Object)exception.getCause());
        } else {
            Mockito.when((Object)((Boolean)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class)))).thenReturn((Object)true);
            Assertions.assertDoesNotThrow(() -> this.consumer.poll(Duration.ZERO));
        }
        Assertions.assertEquals((int)expectedRevokedCount, (int)consumerRebalanceListener.revokedCount());
        Assertions.assertEquals((int)expectedAssignedCount, (int)consumerRebalanceListener.assignedCount());
        Assertions.assertEquals((int)expectedLostCount, (int)consumerRebalanceListener.lostCount());
    }

    private static Stream<Arguments> listenerCallbacksInvokeSource() {
        Optional empty = Optional.empty();
        Optional<RuntimeException> error = Optional.of(new RuntimeException("Intentional error"));
        Optional<KafkaException> kafkaException = Optional.of(new KafkaException("Intentional error"));
        Optional<KafkaException> wrappedException = Optional.of(new KafkaException("User rebalance callback throws an error", (Throwable)error.get()));
        return Stream.of(Arguments.of((Object[])new Object[]{Collections.emptyList(), empty, empty, empty, 0, 0, 0, empty}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED), empty, empty, empty, 1, 0, 0, empty}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), empty, empty, empty, 0, 1, 0, empty}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST), empty, empty, empty, 0, 0, 1, empty}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED), error, empty, empty, 1, 0, 0, wrappedException}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), empty, error, empty, 0, 1, 0, wrappedException}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST), empty, empty, error, 0, 0, 1, wrappedException}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED), kafkaException, empty, empty, 1, 0, 0, kafkaException}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), empty, kafkaException, empty, 0, 1, 0, kafkaException}), Arguments.of((Object[])new Object[]{Collections.singletonList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST), empty, empty, kafkaException, 0, 0, 1, kafkaException}), Arguments.of((Object[])new Object[]{Arrays.asList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), error, empty, empty, 1, 1, 0, wrappedException}), Arguments.of((Object[])new Object[]{Arrays.asList(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED), kafkaException, error, empty, 1, 1, 0, kafkaException}));
    }

    @Test
    public void testBackgroundError() {
        String groupId = "consumerGroupA";
        this.consumer = this.newConsumer(this.requiredConsumerConfigAndGroupId("consumerGroupA"));
        KafkaException expectedException = new KafkaException("Nobody expects the Spanish Inquisition");
        ErrorEvent errorEvent = new ErrorEvent((Throwable)expectedException);
        this.backgroundEventQueue.add((BackgroundEvent)errorEvent);
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singletonList(new TopicPartition("topic", 0)));
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        KafkaException exception = (KafkaException)Assertions.assertThrows(KafkaException.class, () -> this.consumer.poll(Duration.ZERO));
        Assertions.assertEquals((Object)expectedException.getMessage(), (Object)exception.getMessage());
    }

    @Test
    public void testMultipleBackgroundErrors() {
        String groupId = "consumerGroupA";
        this.consumer = this.newConsumer(this.requiredConsumerConfigAndGroupId("consumerGroupA"));
        KafkaException expectedException1 = new KafkaException("Nobody expects the Spanish Inquisition");
        ErrorEvent errorEvent1 = new ErrorEvent((Throwable)expectedException1);
        this.backgroundEventQueue.add((BackgroundEvent)errorEvent1);
        KafkaException expectedException2 = new KafkaException("Spam, Spam, Spam");
        ErrorEvent errorEvent2 = new ErrorEvent((Throwable)expectedException2);
        this.backgroundEventQueue.add((BackgroundEvent)errorEvent2);
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singletonList(new TopicPartition("topic", 0)));
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        KafkaException exception = (KafkaException)Assertions.assertThrows(KafkaException.class, () -> this.consumer.poll(Duration.ZERO));
        Assertions.assertEquals((Object)expectedException1.getMessage(), (Object)exception.getMessage());
        Assertions.assertTrue((boolean)this.backgroundEventQueue.isEmpty());
    }

    @Test
    public void testGroupRemoteAssignorUnusedIfGroupIdUndefined() {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("group.remote.assignor", "someAssignor");
        props.put("group.protocol", GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        Assertions.assertTrue((boolean)config.unused().contains("group.remote.assignor"));
    }

    @Test
    public void testGroupRemoteAssignorInClassicProtocol() {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("group.id", "consumerGroupA");
        props.put("group.protocol", GroupProtocol.CLASSIC.name().toLowerCase(Locale.ROOT));
        props.put("group.remote.assignor", "someAssignor");
        Assertions.assertThrows(ConfigException.class, () -> new ConsumerConfig(props));
    }

    @Test
    public void testGroupRemoteAssignorUsedInConsumerProtocol() {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("group.id", "consumerGroupA");
        props.put("group.protocol", GroupProtocol.CONSUMER.name().toLowerCase(Locale.ROOT));
        props.put("group.remote.assignor", "someAssignor");
        props.put("enable.auto.commit", (Object)false);
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        Assertions.assertFalse((boolean)config.unused().contains("group.remote.assignor"));
    }

    @Test
    public void testGroupIdNull() {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("auto.commit.interval.ms", (Object)10000);
        props.put("internal.throw.on.fetch.stable.offset.unsupported", (Object)true);
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        Assertions.assertFalse((boolean)config.unused().contains("auto.commit.interval.ms"));
        Assertions.assertFalse((boolean)config.unused().contains("internal.throw.on.fetch.stable.offset.unsupported"));
    }

    @Test
    public void testGroupIdNotNullAndValid() {
        Properties props = this.requiredConsumerConfigAndGroupId("consumerGroupA");
        props.put("auto.commit.interval.ms", (Object)10000);
        props.put("internal.throw.on.fetch.stable.offset.unsupported", (Object)true);
        props.put("enable.auto.commit", (Object)false);
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = this.newConsumer(config);
        Assertions.assertTrue((boolean)config.unused().contains("auto.commit.interval.ms"));
        Assertions.assertTrue((boolean)config.unused().contains("internal.throw.on.fetch.stable.offset.unsupported"));
    }

    @Test
    public void testEnsurePollEventSentOnConsumerPoll() {
        SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
        this.consumer = this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)new ConsumerInterceptors(Collections.emptyList(), this.metrics), (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class), subscriptions, "group-id", "client-id", false);
        TopicPartition tp = new TopicPartition("topic", 0);
        List<ConsumerRecord> records = Collections.singletonList(new ConsumerRecord("topic", 0, 2L, (Object)"key1", (Object)"value1"));
        ((FetchCollector)Mockito.doAnswer(invocation -> Fetch.forPartition((TopicPartition)tp, (List)records, (boolean)true, (OffsetAndMetadata)new OffsetAndMetadata(3L, Optional.of(0), ""))).when(this.fetchCollector)).collectFetch((FetchBuffer)Mockito.any(FetchBuffer.class));
        Mockito.when((Object)((Boolean)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class)))).thenReturn((Object)true);
        this.completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singletonList("topic1"));
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        this.consumer.poll(Duration.ofMillis(100L));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(PollEvent.class));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.any(CreateFetchRequestsEvent.class));
    }

    private Properties requiredConsumerConfigAndGroupId(String groupId) {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("group.id", groupId);
        return props;
    }

    private void testUpdateFetchPositionsWithFetchCommittedOffsetsTimeout() {
        this.completeFetchedCommittedOffsetApplicationEventExceptionally((Exception)((Object)new org.apache.kafka.common.errors.TimeoutException()));
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Object)((Boolean)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class)))).thenReturn((Object)true);
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(new TopicPartition("t1", 1)));
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        this.consumer.poll(Duration.ZERO);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.atLeast((int)1))).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(CheckAndUpdatePositionsEvent.class));
    }

    @Test
    public void testLongPollWaitIsLimited() {
        this.consumer = this.newConsumer();
        String topicName = "topic1";
        this.completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singletonList(topicName));
        Assertions.assertEquals(Collections.singleton(topicName), (Object)this.consumer.subscription());
        Assertions.assertTrue((boolean)this.consumer.assignment().isEmpty());
        int partition = 3;
        TopicPartition tp = new TopicPartition(topicName, 3);
        List<ConsumerRecord> records = Arrays.asList(new ConsumerRecord(topicName, 3, 2L, (Object)"key1", (Object)"value1"), new ConsumerRecord(topicName, 3, 3L, (Object)"key2", (Object)"value2"));
        OffsetAndMetadata nextOffsetAndMetadata = new OffsetAndMetadata(4L, Optional.of(0), "");
        Set<TopicPartition> partitions = Collections.singleton(tp);
        ((FetchCollector)Mockito.doAnswer(invocation -> {
            this.consumer.subscriptions().assignFromSubscribed((Collection)partitions);
            this.consumer.setGroupAssignmentSnapshot(partitions);
            return Fetch.empty();
        }).doAnswer(invocation -> Fetch.forPartition((TopicPartition)tp, (List)records, (boolean)true, (OffsetAndMetadata)nextOffsetAndMetadata)).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Object)((Boolean)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class)))).thenReturn((Object)true);
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        ConsumerRecords returnedRecords = this.consumer.poll(Duration.ofMillis(10000L));
        Assertions.assertEquals((int)2, (int)returnedRecords.count());
        Assertions.assertEquals((long)4L, (long)((OffsetAndMetadata)returnedRecords.nextOffsets().get(tp)).offset());
        Assertions.assertEquals(Optional.of(0), (Object)((OffsetAndMetadata)returnedRecords.nextOffsets().get(tp)).leaderEpoch());
        Assertions.assertEquals(Collections.singleton(topicName), (Object)this.consumer.subscription());
        Assertions.assertEquals(partitions, (Object)this.consumer.assignment());
    }

    @Test
    public void testProcessBackgroundEventsWithInitialDelay() throws Exception {
        this.consumer = this.newConsumer();
        Timer timer = this.time.timer(1000L);
        CompletableFuture future = (CompletableFuture)Mockito.mock(CompletableFuture.class);
        CountDownLatch latch = new CountDownLatch(3);
        ((CompletableFuture)Mockito.doAnswer(invocation -> {
            latch.countDown();
            if (latch.getCount() > 0L) {
                long timeout = (Long)invocation.getArgument(0, Long.class);
                timer.sleep(timeout);
                throw new TimeoutException("Intentional timeout");
            }
            future.complete(null);
            return null;
        }).when((Object)future)).get((Long)ArgumentMatchers.any(Long.class), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)));
        this.consumer.processBackgroundEvents((Future)future, timer, e -> false);
        Assertions.assertEquals((long)800L, (long)timer.remainingMs());
    }

    @Test
    public void testProcessBackgroundEventsWithoutDelay() {
        this.consumer = this.newConsumer();
        Timer timer = this.time.timer(1000L);
        CompletableFuture<Object> future = CompletableFuture.completedFuture(null);
        this.consumer.processBackgroundEvents(future, timer, e -> false);
        Assertions.assertEquals((long)1000L, (long)timer.remainingMs());
    }

    @Test
    public void testProcessBackgroundEventsTimesOut() throws Exception {
        this.consumer = this.newConsumer();
        Timer timer = this.time.timer(1000L);
        CompletableFuture future = (CompletableFuture)Mockito.mock(CompletableFuture.class);
        ((CompletableFuture)Mockito.doAnswer(invocation -> {
            long timeout = (Long)invocation.getArgument(0, Long.class);
            timer.sleep(timeout);
            throw new TimeoutException("Intentional timeout");
        }).when((Object)future)).get((Long)ArgumentMatchers.any(Long.class), (TimeUnit)((Object)ArgumentMatchers.any(TimeUnit.class)));
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.processBackgroundEvents((Future)future, timer, e -> false));
        Assertions.assertEquals((long)0L, (long)timer.remainingMs());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPollThrowsInterruptExceptionIfInterrupted() {
        this.consumer = this.newConsumer();
        String topicName = "foo";
        int partition = 3;
        TopicPartition tp = new TopicPartition("foo", 3);
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Object)((Boolean)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class)))).thenReturn((Object)true);
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.completeAssignmentChangeEventSuccessfully();
        this.consumer.assign(Collections.singleton(tp));
        try {
            Thread.currentThread().interrupt();
            this.markReconcileAndAutoCommitCompleteForPollEvent();
            Assertions.assertThrows(InterruptException.class, () -> this.consumer.poll(Duration.ZERO));
        }
        finally {
            Thread.interrupted();
        }
        Assertions.assertDoesNotThrow(() -> this.consumer.poll(Duration.ZERO));
    }

    @Test
    void testReaperInvokedInClose() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.close();
        ((CompletableEventReaper)Mockito.verify((Object)this.backgroundEventReaper)).reap(this.backgroundEventQueue);
    }

    @Test
    void testReaperInvokedInUnsubscribe() {
        this.consumer = this.newConsumer();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.unsubscribe();
        ((CompletableEventReaper)Mockito.verify((Object)this.backgroundEventReaper)).reap(this.time.milliseconds());
    }

    @Test
    void testReaperInvokedInPoll() {
        this.consumer = this.newConsumer();
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        this.completeTopicSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(Collections.singletonList("topic"));
        Mockito.when((Object)((Boolean)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class)))).thenReturn((Object)true);
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        this.consumer.poll(Duration.ZERO);
        ((CompletableEventReaper)Mockito.verify((Object)this.backgroundEventReaper)).reap(this.time.milliseconds());
    }

    @Test
    public void testUnsubscribeWithoutGroupId() {
        this.consumer = this.newConsumerWithoutGroupId();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.unsubscribe();
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(UnsubscribeEvent.class));
    }

    @Test
    public void testSeekToBeginning() {
        Set<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
        this.consumer = this.newConsumer();
        this.consumer.seekToBeginning(topics);
        CompletableApplicationEvent event = this.addAndGetLastEnqueuedEvent();
        ResetOffsetEvent resetOffsetEvent = (ResetOffsetEvent)Assertions.assertInstanceOf(ResetOffsetEvent.class, event);
        Assertions.assertEquals(topics, new HashSet(resetOffsetEvent.topicPartitions()));
        Assertions.assertEquals((Object)AutoOffsetResetStrategy.EARLIEST, (Object)resetOffsetEvent.offsetResetStrategy());
    }

    @Test
    public void testSeekToBeginningWithException() {
        Set<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
        this.consumer = this.newConsumer();
        this.completeResetOffsetEventExceptionally((Exception)((Object)new org.apache.kafka.common.errors.TimeoutException()));
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.seekToBeginning(topics));
    }

    @Test
    public void testSeekToEndWithException() {
        Set<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
        this.consumer = this.newConsumer();
        this.completeResetOffsetEventExceptionally((Exception)((Object)new org.apache.kafka.common.errors.TimeoutException()));
        Assertions.assertThrows(org.apache.kafka.common.errors.TimeoutException.class, () -> this.consumer.seekToEnd(topics));
    }

    @Test
    public void testSeekToEnd() {
        Set<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
        this.consumer = this.newConsumer();
        this.consumer.seekToEnd(topics);
        CompletableApplicationEvent event = this.addAndGetLastEnqueuedEvent();
        ResetOffsetEvent resetOffsetEvent = (ResetOffsetEvent)Assertions.assertInstanceOf(ResetOffsetEvent.class, event);
        Assertions.assertEquals(topics, new HashSet(resetOffsetEvent.topicPartitions()));
        Assertions.assertEquals((Object)AutoOffsetResetStrategy.LATEST, (Object)resetOffsetEvent.offsetResetStrategy());
    }

    @Test
    public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() {
        this.consumer = this.newConsumer();
        ((FetchCollector)Mockito.doReturn((Object)Fetch.empty()).when(this.fetchCollector)).collectFetch((FetchBuffer)ArgumentMatchers.any(FetchBuffer.class));
        Mockito.when((Object)((Boolean)this.applicationEventHandler.addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(CheckAndUpdatePositionsEvent.class)))).thenReturn((Object)true);
        ((ConsumerMetadata)Mockito.doReturn((Object)Metadata.LeaderAndEpoch.noLeaderOrEpoch()).when((Object)this.metadata)).currentLeader((TopicPartition)ArgumentMatchers.any());
        this.completeAssignmentChangeEventSuccessfully();
        this.completeTopicPatternSubscriptionChangeEventSuccessfully();
        this.completeUnsubscribeApplicationEventSuccessfully();
        this.consumer.assign(Collections.singleton(new TopicPartition("topic1", 0)));
        this.markReconcileAndAutoCommitCompleteForPollEvent();
        this.consumer.poll(Duration.ZERO);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler, (VerificationMode)Mockito.never())).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(UpdatePatternSubscriptionEvent.class));
        this.consumer.unsubscribe();
        this.consumer.subscribe(Pattern.compile("t*"));
        this.consumer.poll(Duration.ZERO);
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(UpdatePatternSubscriptionEvent.class));
    }

    @Test
    public void testSubscribeToRe2JPatternValidation() {
        this.consumer = this.newConsumer();
        Throwable t = Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.subscribe((SubscriptionPattern)null));
        Assertions.assertEquals((Object)"Topic pattern to subscribe to cannot be null", (Object)t.getMessage());
        t = Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.subscribe(new SubscriptionPattern("")));
        Assertions.assertEquals((Object)"Topic pattern to subscribe to cannot be empty", (Object)t.getMessage());
        Assertions.assertDoesNotThrow(() -> this.consumer.subscribe(new SubscriptionPattern("t*")));
        Assertions.assertThrows(IllegalArgumentException.class, () -> this.consumer.subscribe(new SubscriptionPattern("t*"), null));
        Assertions.assertDoesNotThrow(() -> this.consumer.subscribe(new SubscriptionPattern("t*"), (ConsumerRebalanceListener)Mockito.mock(ConsumerRebalanceListener.class)));
    }

    @Test
    public void testSubscribeToRe2JPatternThrowsIfNoGroupId() {
        this.consumer = this.newConsumer(TestUtils.requiredConsumerConfig());
        Assertions.assertThrows(InvalidGroupIdException.class, () -> this.consumer.subscribe(new SubscriptionPattern("t*")));
        Assertions.assertThrows(InvalidGroupIdException.class, () -> this.consumer.subscribe(new SubscriptionPattern("t*"), (ConsumerRebalanceListener)Mockito.mock(ConsumerRebalanceListener.class)));
    }

    @Test
    public void testSubscribeToRe2JPatternGeneratesEvent() {
        this.consumer = this.newConsumer();
        this.completeTopicRe2JPatternSubscriptionChangeEventSuccessfully();
        this.consumer.subscribe(new SubscriptionPattern("t*"));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(TopicRe2JPatternSubscriptionChangeEvent.class));
        Mockito.clearInvocations((Object[])new ApplicationEventHandler[]{this.applicationEventHandler});
        this.consumer.subscribe(new SubscriptionPattern("t*"), (ConsumerRebalanceListener)Mockito.mock(ConsumerRebalanceListener.class));
        ((ApplicationEventHandler)Mockito.verify((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(TopicRe2JPatternSubscriptionChangeEvent.class));
    }

    @Test
    public void testSubscribePatternAgainstBrokerNotSupportingRegex() throws InterruptedException {
        Properties props = TestUtils.requiredConsumerConfig();
        props.put("group.id", "group-id");
        props.put("enable.auto.commit", "false");
        ConsumerConfig config = new ConsumerConfig(props);
        ConsumerMetadata metadata = new ConsumerMetadata(0L, 0L, Long.MAX_VALUE, false, false, (SubscriptionState)Mockito.mock(SubscriptionState.class), new LogContext(), new ClusterResourceListeners());
        MockClient client = new MockClient(this.time, (Metadata)metadata);
        MetadataResponse initialMetadata = RequestTestUtils.metadataUpdateWithIds(1, Map.of("topic1", 2), Map.of("topic1", Uuid.randomUuid()));
        client.updateMetadata(initialMetadata);
        client.setNodeApiVersions(NodeApiVersions.create((short)ApiKeys.CONSUMER_GROUP_HEARTBEAT.id, (short)0, (short)0));
        Node node = (Node)metadata.fetch().nodes().get(0);
        client.prepareResponseFrom((AbstractResponse)FindCoordinatorResponse.prepareResponse((Errors)Errors.NONE, (String)"group-id", (Node)node), node);
        ConsumerGroupHeartbeatResponse result = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData().setMemberId("").setMemberEpoch(0));
        Node coordinator = new Node(Integer.MAX_VALUE - node.id(), node.host(), node.port());
        client.prepareResponseFrom((AbstractResponse)result, coordinator);
        SubscriptionState subscriptionState = (SubscriptionState)Mockito.mock(SubscriptionState.class);
        this.consumer = new AsyncKafkaConsumer(new LogContext(), this.time, config, (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer(), (KafkaClient)client, subscriptionState, metadata);
        this.completeTopicRe2JPatternSubscriptionChangeEventSuccessfully();
        SubscriptionPattern pattern = new SubscriptionPattern("t*");
        this.consumer.subscribe(pattern);
        Mockito.when((Object)subscriptionState.subscriptionPattern()).thenReturn((Object)pattern);
        TestUtils.waitForCondition(() -> {
            try {
                this.consumer.poll(Duration.ZERO);
                return false;
            }
            catch (UnsupportedVersionException e) {
                return true;
            }
        }, "Consumer did not throw the expected UnsupportedVersionException on poll");
    }

    @Test
    public void testRecordBackgroundEventQueueSizeAndBackgroundEventQueueTime() {
        this.consumer = this.newConsumer((FetchBuffer)Mockito.mock(FetchBuffer.class), (ConsumerInterceptors<String, String>)((ConsumerInterceptors)Mockito.mock(ConsumerInterceptors.class)), (ConsumerRebalanceListenerInvoker)Mockito.mock(ConsumerRebalanceListenerInvoker.class), (SubscriptionState)Mockito.mock(SubscriptionState.class), "group-id", "client-id", false);
        Metrics metrics = this.consumer.metricsRegistry();
        AsyncConsumerMetrics kafkaConsumerMetrics = this.consumer.kafkaConsumerMetrics();
        ConsumerRebalanceListenerCallbackNeededEvent event = new ConsumerRebalanceListenerCallbackNeededEvent(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, Collections.emptySortedSet());
        event.setEnqueuedMs(this.time.milliseconds());
        this.backgroundEventQueue.add((BackgroundEvent)event);
        kafkaConsumerMetrics.recordBackgroundEventQueueSize(1);
        this.time.sleep(10L);
        this.consumer.processBackgroundEvents();
        Assertions.assertEquals((double)0.0, (double)((Double)metrics.metric(metrics.metricName("background-event-queue-size", "consumer-metrics")).metricValue()));
        Assertions.assertEquals((double)10.0, (double)((Double)metrics.metric(metrics.metricName("background-event-queue-time-avg", "consumer-metrics")).metricValue()));
        Assertions.assertEquals((double)10.0, (double)((Double)metrics.metric(metrics.metricName("background-event-queue-time-max", "consumer-metrics")).metricValue()));
    }

    private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
        TopicPartition t0 = new TopicPartition("t0", 2);
        TopicPartition t1 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, OffsetAndMetadata> topicPartitionOffsets = new HashMap<TopicPartition, OffsetAndMetadata>();
        topicPartitionOffsets.put(t0, new OffsetAndMetadata(10L));
        topicPartitionOffsets.put(t1, new OffsetAndMetadata(20L));
        return topicPartitionOffsets;
    }

    private Map<TopicPartition, OffsetAndTimestampInternal> mockOffsetAndTimestamp() {
        TopicPartition t0 = new TopicPartition("t0", 2);
        TopicPartition t1 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, OffsetAndTimestampInternal> offsetAndTimestamp = new HashMap<TopicPartition, OffsetAndTimestampInternal>();
        offsetAndTimestamp.put(t0, new OffsetAndTimestampInternal(5L, 1L, Optional.empty()));
        offsetAndTimestamp.put(t1, new OffsetAndTimestampInternal(6L, 3L, Optional.empty()));
        return offsetAndTimestamp;
    }

    private Map<TopicPartition, Long> mockTimestampToSearch() {
        TopicPartition t0 = new TopicPartition("t0", 2);
        TopicPartition t1 = new TopicPartition("t0", 3);
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(t0, 1L);
        timestampToSearch.put(t1, 2L);
        return timestampToSearch;
    }

    private void completeCommitAsyncApplicationEventExceptionally(Exception ex) {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            AsyncCommitEvent event = (AsyncCommitEvent)invocation.getArgument(0);
            event.markOffsetsReady();
            event.future().completeExceptionally(ex);
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(AsyncCommitEvent.class));
    }

    private void completeCommitSyncApplicationEventExceptionally(Exception ex) {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            SyncCommitEvent event = (SyncCommitEvent)invocation.getArgument(0);
            event.markOffsetsReady();
            event.future().completeExceptionally(ex);
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(SyncCommitEvent.class));
    }

    private void completeResetOffsetEventExceptionally(Exception ex) {
        ((ApplicationEventHandler)Mockito.doThrow((Throwable[])new Throwable[]{ex}).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(ResetOffsetEvent.class));
    }

    private void completeCommitAsyncApplicationEventSuccessfully() {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            AsyncCommitEvent event = (AsyncCommitEvent)invocation.getArgument(0);
            event.markOffsetsReady();
            event.future().complete(null);
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(AsyncCommitEvent.class));
    }

    private void completeCommitSyncApplicationEventSuccessfully() {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            SyncCommitEvent event = (SyncCommitEvent)invocation.getArgument(0);
            event.markOffsetsReady();
            event.future().complete(null);
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(SyncCommitEvent.class));
    }

    private void completeFetchedCommittedOffsetApplicationEventSuccessfully(Map<TopicPartition, OffsetAndMetadata> committedOffsets) {
        ((ApplicationEventHandler)Mockito.doReturn(committedOffsets).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            FetchCommittedOffsetsEvent event = (FetchCommittedOffsetsEvent)invocation.getArgument(0);
            event.future().complete(committedOffsets);
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(FetchCommittedOffsetsEvent.class));
    }

    private void completeFetchedCommittedOffsetApplicationEventExceptionally(Exception ex) {
        ((ApplicationEventHandler)Mockito.doThrow((Throwable[])new Throwable[]{ex}).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.any(FetchCommittedOffsetsEvent.class));
    }

    private void completeUnsubscribeApplicationEventSuccessfully() {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            UnsubscribeEvent event = (UnsubscribeEvent)invocation.getArgument(0);
            this.consumer.subscriptions().unsubscribe();
            event.future().complete(null);
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(UnsubscribeEvent.class));
    }

    private void completeAssignmentChangeEventSuccessfully() {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            AssignmentChangeEvent event = (AssignmentChangeEvent)invocation.getArgument(0);
            HashSet partitions = new HashSet(event.partitions());
            this.consumer.subscriptions().assignFromUser(partitions);
            event.future().complete(null);
            return null;
        }).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(AssignmentChangeEvent.class));
    }

    private void completeTopicSubscriptionChangeEventSuccessfully() {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            TopicSubscriptionChangeEvent event = (TopicSubscriptionChangeEvent)invocation.getArgument(0);
            this.consumer.subscriptions().subscribe(event.topics(), event.listener());
            event.future().complete(null);
            return null;
        }).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(TopicSubscriptionChangeEvent.class));
    }

    private void completeTopicPatternSubscriptionChangeEventSuccessfully() {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            TopicPatternSubscriptionChangeEvent event = (TopicPatternSubscriptionChangeEvent)invocation.getArgument(0);
            this.consumer.subscriptions().subscribe(event.pattern(), event.listener());
            event.future().complete(null);
            return null;
        }).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(TopicPatternSubscriptionChangeEvent.class));
    }

    private void completeTopicRe2JPatternSubscriptionChangeEventSuccessfully() {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            TopicRe2JPatternSubscriptionChangeEvent event = (TopicRe2JPatternSubscriptionChangeEvent)invocation.getArgument(0);
            this.consumer.subscriptions().subscribe(event.pattern(), event.listener());
            event.future().complete(null);
            return null;
        }).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(TopicRe2JPatternSubscriptionChangeEvent.class));
    }

    private void completeSeekUnvalidatedEventSuccessfully() {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            SeekUnvalidatedEvent event = (SeekUnvalidatedEvent)invocation.getArgument(0);
            SubscriptionState.FetchPosition newPosition = new SubscriptionState.FetchPosition(event.offset(), event.offsetEpoch(), this.metadata.currentLeader(event.partition()));
            this.consumer.subscriptions().seekUnvalidated(event.partition(), newPosition);
            event.future().complete(null);
            return null;
        }).when((Object)this.applicationEventHandler)).addAndGet((CompletableApplicationEvent)ArgumentMatchers.isA(SeekUnvalidatedEvent.class));
    }

    private void forceCommitCallbackInvocation() {
        this.consumer.commitAsync();
    }

    private void markOffsetsReadyForCommitEvent() {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            CommitEvent event = (CommitEvent)invocation.getArgument(0);
            event.markOffsetsReady();
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(CommitEvent.class));
    }

    private void markReconcileAndAutoCommitCompleteForPollEvent() {
        ((ApplicationEventHandler)Mockito.doAnswer(invocation -> {
            PollEvent event = (PollEvent)invocation.getArgument(0);
            event.markReconcileAndAutoCommitComplete();
            return null;
        }).when((Object)this.applicationEventHandler)).add((ApplicationEvent)ArgumentMatchers.isA(PollEvent.class));
    }

    private static class MockCommitCallback
    implements OffsetCommitCallback {
        public int invoked = 0;
        public Exception exception = null;
        public String completionThread;

        private MockCommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            ++this.invoked;
            this.completionThread = Thread.currentThread().getName();
            this.exception = exception;
        }
    }
}

