/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.MockPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ConsumerCoordinatorTest {
    private String topic1 = "test1";
    private String topic2 = "test2";
    private String groupId = "test-group";
    private TopicPartition t1p = new TopicPartition(this.topic1, 0);
    private TopicPartition t2p = new TopicPartition(this.topic2, 0);
    private int rebalanceTimeoutMs = 60000;
    private int sessionTimeoutMs = 10000;
    private int heartbeatIntervalMs = 5000;
    private long retryBackoffMs = 100L;
    private boolean autoCommitEnabled = false;
    private int autoCommitIntervalMs = 2000;
    private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
    private List<PartitionAssignor> assignors = Collections.singletonList(this.partitionAssignor);
    private MockTime time;
    private MockClient client;
    private Cluster cluster = TestUtils.clusterWith(1, (Map<String, Integer>)new HashMap<String, Integer>(){
        {
            this.put(ConsumerCoordinatorTest.this.topic1, 1);
            this.put(ConsumerCoordinatorTest.this.topic2, 1);
        }
    });
    private Node node = (Node)this.cluster.nodes().get(0);
    private SubscriptionState subscriptions;
    private Metadata metadata;
    private Metrics metrics;
    private ConsumerNetworkClient consumerClient;
    private MockRebalanceListener rebalanceListener;
    private MockCommitCallback mockOffsetCommitCallback;
    private ConsumerCoordinator coordinator;

    @Before
    public void setup() {
        this.time = new MockTime();
        this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
        this.metadata = new Metadata(0L, Long.MAX_VALUE, true);
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.client = new MockClient(this.time, this.metadata);
        this.consumerClient = new ConsumerNetworkClient((KafkaClient)this.client, this.metadata, (Time)this.time, 100L, 1000L);
        this.metrics = new Metrics((Time)this.time);
        this.rebalanceListener = new MockRebalanceListener();
        this.mockOffsetCommitCallback = new MockCommitCallback();
        this.partitionAssignor.clear();
        this.client.setNode(this.node);
        this.coordinator = this.buildCoordinator(this.metrics, this.assignors, true, this.autoCommitEnabled, true);
    }

    @After
    public void teardown() {
        this.metrics.close();
    }

    @Test
    public void testNormalHeartbeat() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.time.sleep(this.sessionTimeoutMs);
        RequestFuture future = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertFalse((boolean)future.isDone());
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NONE));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.succeeded());
    }

    @Test(expected=GroupAuthorizationException.class)
    public void testGroupDescribeUnauthorized() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.GROUP_AUTHORIZATION_FAILED));
        this.coordinator.ensureCoordinatorReady();
    }

    @Test(expected=GroupAuthorizationException.class)
    public void testGroupReadUnauthorized() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(0, "memberId", Collections.emptyMap(), Errors.GROUP_AUTHORIZATION_FAILED));
        this.coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
    }

    @Test
    public void testCoordinatorNotAvailable() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.time.sleep(this.sessionTimeoutMs);
        RequestFuture future = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertFalse((boolean)future.isDone());
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.COORDINATOR_NOT_AVAILABLE));
        this.time.sleep(this.sessionTimeoutMs);
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.failed());
        Assert.assertEquals((Object)Errors.COORDINATOR_NOT_AVAILABLE.exception(), (Object)future.exception());
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testNotCoordinator() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.time.sleep(this.sessionTimeoutMs);
        RequestFuture future = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertFalse((boolean)future.isDone());
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NOT_COORDINATOR));
        this.time.sleep(this.sessionTimeoutMs);
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.failed());
        Assert.assertEquals((Object)Errors.NOT_COORDINATOR.exception(), (Object)future.exception());
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testIllegalGeneration() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.t1p));
        this.time.sleep(this.sessionTimeoutMs);
        RequestFuture future = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertFalse((boolean)future.isDone());
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.ILLEGAL_GENERATION));
        this.time.sleep(this.sessionTimeoutMs);
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.failed());
        Assert.assertEquals((Object)Errors.ILLEGAL_GENERATION.exception(), (Object)future.exception());
        Assert.assertTrue((boolean)this.coordinator.needRejoin());
    }

    @Test
    public void testUnknownConsumerId() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.t1p));
        this.time.sleep(this.sessionTimeoutMs);
        RequestFuture future = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertFalse((boolean)future.isDone());
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.UNKNOWN_MEMBER_ID));
        this.time.sleep(this.sessionTimeoutMs);
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.failed());
        Assert.assertEquals((Object)Errors.UNKNOWN_MEMBER_ID.exception(), (Object)future.exception());
        Assert.assertTrue((boolean)this.coordinator.needRejoin());
    }

    @Test
    public void testCoordinatorDisconnect() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.time.sleep(this.sessionTimeoutMs);
        RequestFuture future = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals((long)1L, (long)this.consumerClient.pendingRequestCount());
        Assert.assertFalse((boolean)future.isDone());
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(Errors.NONE), true);
        this.time.sleep(this.sessionTimeoutMs);
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)future.isDone());
        Assert.assertTrue((boolean)future.failed());
        Assert.assertTrue((boolean)(future.exception() instanceof DisconnectException));
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
    }

    @Test(expected=ApiException.class)
    public void testJoinGroupInvalidGroupId() {
        String consumerId = "leader";
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(0, "leader", Collections.emptyMap(), Errors.INVALID_GROUP_ID));
        this.coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
    }

    @Test
    public void testNormalJoinGroupLeader() {
        String consumerId = "leader";
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("leader", Collections.singletonList(this.topic1));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(this.t1p)));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "leader", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                SyncGroupRequest sync = (SyncGroupRequest)body;
                return sync.memberId().equals("leader") && sync.generationId() == 1 && sync.groupAssignment().containsKey("leader");
            }
        }, (AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        Assert.assertFalse((boolean)this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals(Collections.singleton(this.topic1), (Object)this.subscriptions.groupSubscription());
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.revoked);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testPatternJoinGroupLeader() {
        String consumerId = "leader";
        this.subscriptions.subscribe(Pattern.compile("test.*"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.metadata.update(TestUtils.singletonCluster(this.topic1, 1), Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("leader", Collections.singletonList(this.topic1));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Arrays.asList(this.t1p, this.t2p)));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "leader", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                SyncGroupRequest sync = (SyncGroupRequest)body;
                return sync.memberId().equals("leader") && sync.generationId() == 1 && sync.groupAssignment().containsKey("leader");
            }
        }, (AbstractResponse)this.syncGroupResponse(Arrays.asList(this.t1p, this.t2p), Errors.NONE));
        this.client.prepareMetadataUpdate(this.cluster, Collections.emptySet());
        this.coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        Assert.assertFalse((boolean)this.coordinator.needRejoin());
        Assert.assertEquals((long)2L, (long)this.subscriptions.assignedPartitions().size());
        Assert.assertEquals((long)2L, (long)this.subscriptions.groupSubscription().size());
        Assert.assertEquals((long)2L, (long)this.subscriptions.subscription().size());
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.revoked);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals((long)2L, (long)this.rebalanceListener.assigned.size());
    }

    @Test
    public void testMetadataRefreshDuringRebalance() {
        String consumerId = "leader";
        this.subscriptions.subscribe(Pattern.compile(".*"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.metadata.needMetadataForAllTopics(true);
        this.metadata.update(TestUtils.singletonCluster(this.topic1, 1), Collections.emptySet(), this.time.milliseconds());
        Assert.assertEquals(Collections.singleton(this.topic1), (Object)this.subscriptions.subscription());
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> initialSubscription = Collections.singletonMap("leader", Collections.singletonList(this.topic1));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(this.t1p)));
        final List<String> updatedSubscription = Arrays.asList(this.topic1, this.topic2);
        final HashSet<String> updatedSubscriptionSet = new HashSet<String>(updatedSubscription);
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "leader", initialSubscription, Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                HashMap<String, Integer> updatedPartitions = new HashMap<String, Integer>();
                for (String topic : updatedSubscription) {
                    updatedPartitions.put(topic, 1);
                }
                ConsumerCoordinatorTest.this.metadata.update(TestUtils.clusterWith(1, updatedPartitions), Collections.emptySet(), ConsumerCoordinatorTest.this.time.milliseconds());
                return true;
            }
        }, (AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        List<TopicPartition> newAssignment = Arrays.asList(this.t1p, this.t2p);
        HashSet<TopicPartition> newAssignmentSet = new HashSet<TopicPartition>(newAssignment);
        Map<String, List<String>> updatedSubscriptions = Collections.singletonMap("leader", Arrays.asList(this.topic1, this.topic2));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", newAssignment));
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                JoinGroupRequest join = (JoinGroupRequest)body;
                JoinGroupRequest.ProtocolMetadata protocolMetadata = (JoinGroupRequest.ProtocolMetadata)join.groupProtocols().iterator().next();
                PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription((ByteBuffer)protocolMetadata.metadata());
                protocolMetadata.metadata().rewind();
                return subscription.topics().containsAll(updatedSubscriptionSet);
            }
        }, (AbstractResponse)this.joinGroupLeaderResponse(2, "leader", updatedSubscriptions, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(newAssignment, Errors.NONE));
        this.coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        Assert.assertFalse((boolean)this.coordinator.needRejoin());
        Assert.assertEquals(updatedSubscriptionSet, (Object)this.subscriptions.subscription());
        Assert.assertEquals(newAssignmentSet, (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals((long)2L, (long)this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.revoked);
        Assert.assertEquals((long)2L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(newAssignmentSet, this.rebalanceListener.assigned);
    }

    @Test
    public void testWakeupDuringJoin() {
        String consumerId = "leader";
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("leader", Collections.singletonList(this.topic1));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(this.t1p)));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "leader", memberSubscriptions, Errors.NONE));
        this.consumerClient.wakeup();
        try {
            this.coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        }
        catch (WakeupException wakeupException) {
            // empty catch block
        }
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        Assert.assertFalse((boolean)this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.revoked);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testNormalJoinGroupFollower() {
        String consumerId = "consumer";
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                SyncGroupRequest sync = (SyncGroupRequest)body;
                return sync.memberId().equals("consumer") && sync.generationId() == 1 && sync.groupAssignment().isEmpty();
            }
        }, (AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertFalse((boolean)this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals(Collections.singleton(this.topic1), (Object)this.subscriptions.groupSubscription());
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.revoked);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testPatternJoinGroupFollower() {
        String consumerId = "consumer";
        this.subscriptions.subscribe(Pattern.compile("test.*"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.metadata.update(TestUtils.singletonCluster(this.topic1, 1), Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                SyncGroupRequest sync = (SyncGroupRequest)body;
                return sync.memberId().equals("consumer") && sync.generationId() == 1 && sync.groupAssignment().isEmpty();
            }
        }, (AbstractResponse)this.syncGroupResponse(Arrays.asList(this.t1p, this.t2p), Errors.NONE));
        this.client.prepareMetadataUpdate(this.cluster, Collections.emptySet());
        this.coordinator.joinGroupIfNeeded();
        Assert.assertFalse((boolean)this.coordinator.needRejoin());
        Assert.assertEquals((long)2L, (long)this.subscriptions.assignedPartitions().size());
        Assert.assertEquals((long)2L, (long)this.subscriptions.subscription().size());
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.revokedCount);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals((long)2L, (long)this.rebalanceListener.assigned.size());
    }

    @Test
    public void testLeaveGroupOnClose() {
        String consumerId = "consumer";
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded();
        final AtomicBoolean received = new AtomicBoolean(false);
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                received.set(true);
                LeaveGroupRequest leaveRequest = (LeaveGroupRequest)body;
                return leaveRequest.memberId().equals("consumer") && leaveRequest.groupId().equals(ConsumerCoordinatorTest.this.groupId);
            }
        }, (AbstractResponse)new LeaveGroupResponse(Errors.NONE));
        this.coordinator.close(0L);
        Assert.assertTrue((boolean)received.get());
    }

    @Test
    public void testMaybeLeaveGroup() {
        String consumerId = "consumer";
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded();
        final AtomicBoolean received = new AtomicBoolean(false);
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                received.set(true);
                LeaveGroupRequest leaveRequest = (LeaveGroupRequest)body;
                return leaveRequest.memberId().equals("consumer") && leaveRequest.groupId().equals(ConsumerCoordinatorTest.this.groupId);
            }
        }, (AbstractResponse)new LeaveGroupResponse(Errors.NONE));
        this.coordinator.maybeLeaveGroup();
        Assert.assertTrue((boolean)received.get());
        AbstractCoordinator.Generation generation = this.coordinator.generation();
        Assert.assertNull((Object)generation);
    }

    @Test(expected=KafkaException.class)
    public void testUnexpectedErrorOnSyncGroup() {
        String consumerId = "consumer";
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN));
        this.coordinator.joinGroupIfNeeded();
    }

    @Test
    public void testUnknownMemberIdOnSyncGroup() {
        String consumerId = "consumer";
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_MEMBER_ID));
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                JoinGroupRequest joinRequest = (JoinGroupRequest)body;
                return joinRequest.memberId().equals("");
            }
        }, (AbstractResponse)this.joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertFalse((boolean)this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.assignedPartitions());
    }

    @Test
    public void testRebalanceInProgressOnSyncGroup() {
        String consumerId = "consumer";
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.emptyList(), Errors.REBALANCE_IN_PROGRESS));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertFalse((boolean)this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.assignedPartitions());
    }

    @Test
    public void testIllegalGenerationOnSyncGroup() {
        String consumerId = "consumer";
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.emptyList(), Errors.ILLEGAL_GENERATION));
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                JoinGroupRequest joinRequest = (JoinGroupRequest)body;
                return joinRequest.memberId().equals("");
            }
        }, (AbstractResponse)this.joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertFalse((boolean)this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.assignedPartitions());
    }

    @Test
    public void testMetadataChangeTriggersRebalance() {
        String consumerId = "consumer";
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("consumer", Collections.singletonList(this.topic1));
        this.partitionAssignor.prepare(Collections.singletonMap("consumer", Collections.singletonList(this.t1p)));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "consumer", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        Assert.assertFalse((boolean)this.coordinator.needRejoin());
        this.metadata.update(TestUtils.singletonCluster(this.topic1, 2), Collections.emptySet(), this.time.milliseconds());
        Assert.assertTrue((boolean)this.coordinator.needRejoin());
    }

    @Test
    public void testUpdateMetadataDuringRebalance() {
        String topic1 = "topic1";
        String topic2 = "topic2";
        TopicPartition tp1 = new TopicPartition("topic1", 0);
        TopicPartition tp2 = new TopicPartition("topic2", 0);
        String consumerId = "leader";
        List<String> topics = Arrays.asList("topic1", "topic2");
        this.subscriptions.subscribe(new HashSet<String>(topics), (ConsumerRebalanceListener)this.rebalanceListener);
        this.metadata.setTopics(topics);
        this.metadata.update(TestUtils.singletonCluster("topic1", 1), Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("leader", topics);
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(tp1)));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "leader", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                SyncGroupRequest sync = (SyncGroupRequest)body;
                if (sync.memberId().equals("leader") && sync.generationId() == 1 && sync.groupAssignment().containsKey("leader")) {
                    HashMap<String, Integer> topicPartitionCounts = new HashMap<String, Integer>();
                    topicPartitionCounts.put("topic1", 1);
                    topicPartitionCounts.put("topic2", 1);
                    ConsumerCoordinatorTest.this.metadata.update(TestUtils.singletonCluster(topicPartitionCounts), Collections.emptySet(), ConsumerCoordinatorTest.this.time.milliseconds());
                    return true;
                }
                return false;
            }
        }, (AbstractResponse)this.syncGroupResponse(Collections.singletonList(tp1), Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(2, "leader", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Arrays.asList(tp1, tp2), Errors.NONE));
        this.coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        Assert.assertFalse((boolean)this.coordinator.needRejoin());
        Assert.assertEquals(new HashSet<TopicPartition>(Arrays.asList(tp1, tp2)), (Object)this.subscriptions.assignedPartitions());
    }

    @Test
    public void testRebalanceAfterTopicUnavailableWithSubscribe() {
        this.unavailableTopicTest(false, false, Collections.emptySet());
    }

    @Test
    public void testRebalanceAfterTopicUnavailableWithPatternSubscribe() {
        this.unavailableTopicTest(true, false, Collections.emptySet());
    }

    @Test
    public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSSubscribe() {
        this.unavailableTopicTest(true, false, Collections.singleton("notmatching"));
    }

    @Test
    public void testAssignWithTopicUnavailable() {
        this.unavailableTopicTest(true, false, Collections.emptySet());
    }

    private void unavailableTopicTest(boolean patternSubscribe, boolean assign, Set<String> unavailableTopicsInLastMetadata) {
        String consumerId = "consumer";
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.client.prepareMetadataUpdate(Cluster.empty(), Collections.singleton("test1"));
        if (assign) {
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        } else if (patternSubscribe) {
            this.subscriptions.subscribe(Pattern.compile("test.*"), (ConsumerRebalanceListener)this.rebalanceListener);
        } else {
            this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        }
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> memberSubscriptions = Collections.singletonMap("consumer", Collections.singletonList(this.topic1));
        this.partitionAssignor.prepare(Collections.emptyMap());
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(1, "consumer", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.emptyList(), Errors.NONE));
        this.coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        if (!assign) {
            Assert.assertFalse((boolean)this.coordinator.needRejoin());
            Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.assigned);
        }
        Assert.assertTrue((String)"Metadata refresh not requested for unavailable partitions", (boolean)this.metadata.updateRequested());
        this.client.prepareMetadataUpdate(this.cluster, unavailableTopicsInLastMetadata);
        this.client.poll(0L, this.time.milliseconds());
        this.client.prepareResponse((AbstractResponse)this.joinGroupLeaderResponse(2, "consumer", memberSubscriptions, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        Assert.assertFalse((String)"Metadata refresh requested unnecessarily", (boolean)this.metadata.updateRequested());
        if (!assign) {
            Assert.assertFalse((boolean)this.coordinator.needRejoin());
            Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
        }
    }

    @Test
    public void testExcludeInternalTopicsConfigOption() {
        this.subscriptions.subscribe(Pattern.compile(".*"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.metadata.update(TestUtils.singletonCluster("__consumer_offsets", 2), Collections.emptySet(), this.time.milliseconds());
        Assert.assertFalse((boolean)this.subscriptions.subscription().contains("__consumer_offsets"));
    }

    @Test
    public void testIncludeInternalTopicsConfigOption() {
        this.coordinator = this.buildCoordinator(new Metrics(), this.assignors, false, false, true);
        this.subscriptions.subscribe(Pattern.compile(".*"), (ConsumerRebalanceListener)this.rebalanceListener);
        this.metadata.update(TestUtils.singletonCluster("__consumer_offsets", 2), Collections.emptySet(), this.time.milliseconds());
        Assert.assertTrue((boolean)this.subscriptions.subscription().contains("__consumer_offsets"));
    }

    @Test
    public void testRejoinGroup() {
        String otherTopic = "otherTopic";
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.revokedCount);
        Assert.assertTrue((boolean)this.rebalanceListener.revoked.isEmpty());
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
        this.subscriptions.subscribe(new HashSet<String>(Arrays.asList(this.topic1, otherTopic)), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertEquals((long)2L, (long)this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.revoked);
        Assert.assertEquals((long)2L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testDisconnectInJoin() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE), true);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertFalse((boolean)this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), (Object)this.subscriptions.assignedPartitions());
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.revokedCount);
        Assert.assertEquals((long)1L, (long)this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test(expected=ApiException.class)
    public void testInvalidSessionTimeout() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT));
        this.coordinator.joinGroupIfNeeded();
    }

    @Test
    public void testCommitOffsetOnly() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        AtomicBoolean success = new AtomicBoolean(false);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.callback(success));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue((boolean)success.get());
        Assert.assertEquals((long)100L, (long)this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testAutoCommitDynamicAssignment() {
        String consumerId = "consumer";
        ConsumerCoordinator coordinator = this.buildCoordinator(new Metrics(), this.assignors, true, true, true);
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        coordinator.joinGroupIfNeeded();
        this.subscriptions.seek(this.t1p, 100L);
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        this.time.sleep(this.autoCommitIntervalMs);
        coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        Assert.assertEquals((long)100L, (long)this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testAutoCommitDynamicAssignmentRebalance() {
        String consumerId = "consumer";
        ConsumerCoordinator coordinator = this.buildCoordinator(new Metrics(), this.assignors, true, true, true);
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        coordinator.ensureCoordinatorReady();
        this.time.sleep(this.autoCommitIntervalMs);
        this.consumerClient.poll(0L);
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        coordinator.joinGroupIfNeeded();
        this.subscriptions.seek(this.t1p, 100L);
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        this.time.sleep(this.autoCommitIntervalMs);
        coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        Assert.assertEquals((long)100L, (long)this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testAutoCommitManualAssignment() {
        ConsumerCoordinator coordinator = this.buildCoordinator(new Metrics(), this.assignors, true, true, true);
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.seek(this.t1p, 100L);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        this.time.sleep(this.autoCommitIntervalMs);
        coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        Assert.assertEquals((long)100L, (long)this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testAutoCommitManualAssignmentCoordinatorUnknown() {
        ConsumerCoordinator coordinator = this.buildCoordinator(new Metrics(), this.assignors, true, true, true);
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.seek(this.t1p, 100L);
        this.consumerClient.poll(0L);
        this.time.sleep(this.autoCommitIntervalMs);
        this.consumerClient.poll(0L);
        Assert.assertNull((Object)this.subscriptions.committed(this.t1p));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        coordinator.ensureCoordinatorReady();
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        Assert.assertEquals((long)100L, (long)this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testCommitOffsetMetadata() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        AtomicBoolean success = new AtomicBoolean(false);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "hello")), this.callback(success));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue((boolean)success.get());
        Assert.assertEquals((long)100L, (long)this.subscriptions.committed(this.t1p).offset());
        Assert.assertEquals((Object)"hello", (Object)this.subscriptions.committed(this.t1p).metadata());
    }

    @Test
    public void testCommitOffsetAsyncWithDefaultCallback() {
        int invokedBeforeTest = this.mockOffsetCommitCallback.invoked;
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)this.mockOffsetCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertEquals((long)(invokedBeforeTest + 1), (long)this.mockOffsetCommitCallback.invoked);
        Assert.assertNull((Object)this.mockOffsetCommitCallback.exception);
    }

    @Test
    public void testCommitAfterLeaveGroup() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
        this.client.prepareMetadataUpdate(this.cluster, Collections.emptySet());
        this.coordinator.joinGroupIfNeeded();
        this.client.prepareResponse((AbstractResponse)new LeaveGroupResponse(Errors.NONE));
        this.subscriptions.unsubscribe();
        this.coordinator.maybeLeaveGroup();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                OffsetCommitRequest commitRequest = (OffsetCommitRequest)body;
                return commitRequest.memberId().equals("") && commitRequest.generationId() == -1;
            }
        }, (AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        AtomicBoolean success = new AtomicBoolean(false);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.callback(success));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue((boolean)success.get());
    }

    @Test
    public void testCommitOffsetAsyncFailedWithDefaultCallback() {
        int invokedBeforeTest = this.mockOffsetCommitCallback.invoked;
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.COORDINATOR_NOT_AVAILABLE)));
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)this.mockOffsetCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertEquals((long)(invokedBeforeTest + 1), (long)this.mockOffsetCommitCallback.invoked);
        Assert.assertTrue((boolean)(this.mockOffsetCommitCallback.exception instanceof RetriableCommitFailedException));
    }

    @Test
    public void testCommitOffsetAsyncCoordinatorNotAvailable() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        MockCommitCallback cb = new MockCommitCallback();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.COORDINATOR_NOT_AVAILABLE)));
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)cb);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
        Assert.assertEquals((long)1L, (long)cb.invoked);
        Assert.assertTrue((boolean)(cb.exception instanceof RetriableCommitFailedException));
    }

    @Test
    public void testCommitOffsetAsyncNotCoordinator() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        MockCommitCallback cb = new MockCommitCallback();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NOT_COORDINATOR)));
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)cb);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
        Assert.assertEquals((long)1L, (long)cb.invoked);
        Assert.assertTrue((boolean)(cb.exception instanceof RetriableCommitFailedException));
    }

    @Test
    public void testCommitOffsetAsyncDisconnected() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        MockCommitCallback cb = new MockCommitCallback();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)), true);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), (OffsetCommitCallback)cb);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue((boolean)this.coordinator.coordinatorUnknown());
        Assert.assertEquals((long)1L, (long)cb.invoked);
        Assert.assertTrue((boolean)(cb.exception instanceof RetriableCommitFailedException));
    }

    @Test
    public void testCommitOffsetSyncNotCoordinator() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NOT_COORDINATOR)));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
    }

    @Test
    public void testCommitOffsetSyncCoordinatorNotAvailable() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.COORDINATOR_NOT_AVAILABLE)));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
    }

    @Test
    public void testCommitOffsetSyncCoordinatorDisconnected() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)), true);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.NONE)));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
    }

    @Test(expected=KafkaException.class)
    public void testCommitUnknownTopicOrPartition() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
    }

    @Test(expected=OffsetMetadataTooLarge.class)
    public void testCommitOffsetMetadataTooLarge() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.OFFSET_METADATA_TOO_LARGE)));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
    }

    @Test(expected=CommitFailedException.class)
    public void testCommitOffsetIllegalGeneration() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.ILLEGAL_GENERATION)));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
    }

    @Test(expected=CommitFailedException.class)
    public void testCommitOffsetUnknownMemberId() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.UNKNOWN_MEMBER_ID)));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
    }

    @Test(expected=CommitFailedException.class)
    public void testCommitOffsetRebalanceInProgress() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.REBALANCE_IN_PROGRESS)));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
    }

    @Test(expected=KafkaException.class)
    public void testCommitOffsetSyncCallbackWithNonRetriableException() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse)this.offsetCommitResponse(Collections.singletonMap(this.t1p, Errors.UNKNOWN)), false);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
    }

    @Test(expected=IllegalArgumentException.class)
    public void testCommitSyncNegativeOffset() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(-1L)), Long.MAX_VALUE);
    }

    @Test
    public void testCommitAsyncNegativeOffset() {
        int invokedBeforeTest = this.mockOffsetCommitCallback.invoked;
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(-1L)), (OffsetCommitCallback)this.mockOffsetCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertEquals((long)(invokedBeforeTest + 1), (long)this.mockOffsetCommitCallback.invoked);
        Assert.assertTrue((boolean)(this.mockOffsetCommitCallback.exception instanceof IllegalArgumentException));
    }

    @Test
    public void testRefreshOffset() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.needRefreshCommits();
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded();
        Assert.assertFalse((boolean)this.subscriptions.refreshCommitsNeeded());
        Assert.assertEquals((long)100L, (long)this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testRefreshOffsetLoadInProgress() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.needRefreshCommits();
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(Errors.COORDINATOR_LOAD_IN_PROGRESS));
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded();
        Assert.assertFalse((boolean)this.subscriptions.refreshCommitsNeeded());
        Assert.assertEquals((long)100L, (long)this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testRefreshOffsetsGroupNotAuthorized() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.needRefreshCommits();
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED));
        try {
            this.coordinator.refreshCommittedOffsetsIfNeeded();
            Assert.fail((String)"Expected group authorization error");
        }
        catch (GroupAuthorizationException e) {
            Assert.assertEquals((Object)this.groupId, (Object)e.groupId());
        }
    }

    @Test(expected=KafkaException.class)
    public void testRefreshOffsetUnknownTopicOrPartition() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.needRefreshCommits();
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(this.t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded();
    }

    @Test
    public void testRefreshOffsetNotCoordinatorForConsumer() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.needRefreshCommits();
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(Errors.NOT_COORDINATOR));
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded();
        Assert.assertFalse((boolean)this.subscriptions.refreshCommitsNeeded());
        Assert.assertEquals((long)100L, (long)this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testRefreshOffsetWithNoFetchableOffsets() {
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.needRefreshCommits();
        this.client.prepareResponse((AbstractResponse)this.offsetFetchResponse(this.t1p, Errors.NONE, "", -1L));
        this.coordinator.refreshCommittedOffsetsIfNeeded();
        Assert.assertFalse((boolean)this.subscriptions.refreshCommitsNeeded());
        Assert.assertEquals(null, (Object)this.subscriptions.committed(this.t1p));
    }

    @Test
    public void testProtocolMetadataOrder() {
        List metadata;
        ConsumerCoordinator coordinator;
        RoundRobinAssignor roundRobin = new RoundRobinAssignor();
        RangeAssignor range = new RangeAssignor();
        try (Metrics metrics = new Metrics((Time)this.time);){
            coordinator = this.buildCoordinator(metrics, Arrays.asList(roundRobin, range), true, false, true);
            metadata = coordinator.metadata();
            Assert.assertEquals((long)2L, (long)metadata.size());
            Assert.assertEquals((Object)roundRobin.name(), (Object)((JoinGroupRequest.ProtocolMetadata)metadata.get(0)).name());
            Assert.assertEquals((Object)range.name(), (Object)((JoinGroupRequest.ProtocolMetadata)metadata.get(1)).name());
        }
        metrics = new Metrics((Time)this.time);
        var4_4 = null;
        try {
            coordinator = this.buildCoordinator(metrics, Arrays.asList(range, roundRobin), true, false, true);
            metadata = coordinator.metadata();
            Assert.assertEquals((long)2L, (long)metadata.size());
            Assert.assertEquals((Object)range.name(), (Object)((JoinGroupRequest.ProtocolMetadata)metadata.get(0)).name());
            Assert.assertEquals((Object)roundRobin.name(), (Object)((JoinGroupRequest.ProtocolMetadata)metadata.get(1)).name());
        }
        catch (Throwable throwable) {
            var4_4 = throwable;
            throw throwable;
        }
        finally {
            if (metrics != null) {
                if (var4_4 != null) {
                    try {
                        metrics.close();
                    }
                    catch (Throwable x2) {
                        var4_4.addSuppressed(x2);
                    }
                } else {
                    metrics.close();
                }
            }
        }
    }

    @Test
    public void testCloseDynamicAssignment() throws Exception {
        ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, true);
        this.gracefulCloseTest(coordinator, true);
    }

    @Test
    public void testCloseManualAssignment() throws Exception {
        ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(false, true, true);
        this.gracefulCloseTest(coordinator, false);
    }

    @Test
    public void shouldNotLeaveGroupWhenLeaveGroupFlagIsFalse() throws Exception {
        ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, false);
        this.gracefulCloseTest(coordinator, false);
    }

    @Test
    public void testCloseCoordinatorNotKnownManualAssignment() throws Exception {
        ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(false, true, true);
        this.makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
        this.time.sleep(this.autoCommitIntervalMs);
        this.closeVerifyTimeout(coordinator, 1000L, 60000L, 1000L, 1000L);
    }

    @Test
    public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
        ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, false, true);
        this.makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
        this.closeVerifyTimeout(coordinator, 1000L, 60000L, 0L, 0L);
    }

    @Test
    public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
        ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, true);
        this.makeCoordinatorUnknown(coordinator, Errors.NOT_COORDINATOR);
        this.time.sleep(this.autoCommitIntervalMs);
        this.closeVerifyTimeout(coordinator, 1000L, 60000L, 1000L, 1000L);
    }

    @Test
    public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
        ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, false, true);
        this.makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
        this.closeVerifyTimeout(coordinator, 1000L, 60000L, 0L, 0L);
    }

    @Test
    public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception {
        ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, true);
        this.makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
        this.time.sleep(this.autoCommitIntervalMs);
        this.closeVerifyTimeout(coordinator, 1000L, 60000L, 1000L, 1000L);
    }

    @Test
    public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception {
        ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, true);
        this.makeCoordinatorUnknown(coordinator, Errors.COORDINATOR_NOT_AVAILABLE);
        this.time.sleep(this.autoCommitIntervalMs);
        this.closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000L, 60000L, 60000L);
    }

    @Test
    public void testCloseNoResponseForCommit() throws Exception {
        ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, true);
        this.time.sleep(this.autoCommitIntervalMs);
        this.closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000L, 60000L, 60000L);
    }

    @Test
    public void testCloseNoResponseForLeaveGroup() throws Exception {
        ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, false, true);
        this.closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000L, 60000L, 60000L);
    }

    @Test
    public void testCloseNoWait() throws Exception {
        ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, true);
        this.time.sleep(this.autoCommitIntervalMs);
        this.closeVerifyTimeout(coordinator, 0L, 60000L, 0L, 0L);
    }

    @Test
    public void testHeartbeatThreadClose() throws Exception {
        this.groupId = "testCloseTimeoutWithHeartbeatThread";
        ConsumerCoordinator coordinator = this.prepareCoordinatorForCloseTest(true, true, true);
        coordinator.ensureActiveGroup();
        this.time.sleep(this.heartbeatIntervalMs + 100);
        Thread.yield();
        this.closeVerifyTimeout(coordinator, Long.MAX_VALUE, 60000L, 60000L, 60000L);
        Thread[] threads = new Thread[Thread.activeCount()];
        int threadCount = Thread.enumerate(threads);
        for (int i = 0; i < threadCount; ++i) {
            Assert.assertFalse((String)"Heartbeat thread active after close", (boolean)threads[i].getName().contains(this.groupId));
        }
    }

    private ConsumerCoordinator prepareCoordinatorForCloseTest(boolean useGroupManagement, boolean autoCommit, boolean leaveGroup) {
        String consumerId = "consumer";
        ConsumerCoordinator coordinator = this.buildCoordinator(new Metrics(), this.assignors, true, autoCommit, leaveGroup);
        this.client.prepareResponse((AbstractResponse)this.groupCoordinatorResponse(this.node, Errors.NONE));
        coordinator.ensureCoordinatorReady();
        if (useGroupManagement) {
            this.subscriptions.subscribe(Collections.singleton(this.topic1), (ConsumerRebalanceListener)this.rebalanceListener);
            this.client.prepareResponse((AbstractResponse)this.joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
            this.client.prepareResponse((AbstractResponse)this.syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE));
            coordinator.joinGroupIfNeeded();
        } else {
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        }
        this.subscriptions.seek(this.t1p, 100L);
        coordinator.poll(this.time.milliseconds(), Long.MAX_VALUE);
        return coordinator;
    }

    private void makeCoordinatorUnknown(ConsumerCoordinator coordinator, Errors error) {
        this.time.sleep(this.sessionTimeoutMs);
        coordinator.sendHeartbeatRequest();
        this.client.prepareResponse((AbstractResponse)this.heartbeatResponse(error));
        this.time.sleep(this.sessionTimeoutMs);
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)coordinator.coordinatorUnknown());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeVerifyTimeout(final ConsumerCoordinator coordinator, final long closeTimeoutMs, final long requestTimeoutMs, long expectedMinTimeMs, long expectedMaxTimeMs) throws Exception {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        try {
            boolean coordinatorUnknown = coordinator.coordinatorUnknown();
            Future<?> future = executor.submit(new Runnable(){

                @Override
                public void run() {
                    coordinator.close(Math.min(closeTimeoutMs, requestTimeoutMs));
                }
            });
            if (!coordinatorUnknown) {
                this.client.waitForRequests(1, 1000L);
            } else {
                Thread.sleep(200L);
            }
            if (expectedMinTimeMs > 0L) {
                this.time.sleep(expectedMinTimeMs - 1L);
                try {
                    future.get(500L, TimeUnit.MILLISECONDS);
                    Assert.fail((String)"Close completed ungracefully without waiting for timeout");
                }
                catch (TimeoutException timeoutException) {
                    // empty catch block
                }
            }
            if (expectedMaxTimeMs >= 0L) {
                this.time.sleep(expectedMaxTimeMs - expectedMinTimeMs + 2L);
            }
            future.get(2000L, TimeUnit.MILLISECONDS);
        }
        finally {
            executor.shutdownNow();
        }
    }

    private void gracefulCloseTest(ConsumerCoordinator coordinator, boolean shouldLeaveGroup) throws Exception {
        final AtomicBoolean commitRequested = new AtomicBoolean();
        final AtomicBoolean leaveGroupRequested = new AtomicBoolean();
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                commitRequested.set(true);
                OffsetCommitRequest commitRequest = (OffsetCommitRequest)body;
                return commitRequest.groupId().equals(ConsumerCoordinatorTest.this.groupId);
            }
        }, (AbstractResponse)new OffsetCommitResponse(new HashMap()));
        this.client.prepareResponse(new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                leaveGroupRequested.set(true);
                LeaveGroupRequest leaveRequest = (LeaveGroupRequest)body;
                return leaveRequest.groupId().equals(ConsumerCoordinatorTest.this.groupId);
            }
        }, (AbstractResponse)new LeaveGroupResponse(Errors.NONE));
        coordinator.close();
        Assert.assertTrue((String)"Commit not requested", (boolean)commitRequested.get());
        Assert.assertEquals((String)("leaveGroupRequested should be " + shouldLeaveGroup), (Object)shouldLeaveGroup, (Object)leaveGroupRequested.get());
    }

    private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors, boolean excludeInternalTopics, boolean autoCommitEnabled, boolean leaveGroup) {
        return new ConsumerCoordinator(this.consumerClient, this.groupId, this.rebalanceTimeoutMs, this.sessionTimeoutMs, this.heartbeatIntervalMs, assignors, this.metadata, this.subscriptions, metrics, "consumer" + this.groupId, (Time)this.time, this.retryBackoffMs, autoCommitEnabled, this.autoCommitIntervalMs, null, excludeInternalTopics, leaveGroup);
    }

    private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
        return new FindCoordinatorResponse(error, node);
    }

    private HeartbeatResponse heartbeatResponse(Errors error) {
        return new HeartbeatResponse(error);
    }

    private JoinGroupResponse joinGroupLeaderResponse(int generationId, String memberId, Map<String, List<String>> subscriptions, Errors error) {
        HashMap<String, ByteBuffer> metadata = new HashMap<String, ByteBuffer>();
        for (Map.Entry<String, List<String>> subscriptionEntry : subscriptions.entrySet()) {
            PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(subscriptionEntry.getValue());
            ByteBuffer buf = ConsumerProtocol.serializeSubscription((PartitionAssignor.Subscription)subscription);
            metadata.put(subscriptionEntry.getKey(), buf);
        }
        return new JoinGroupResponse(error, generationId, this.partitionAssignor.name(), memberId, memberId, metadata);
    }

    private JoinGroupResponse joinGroupFollowerResponse(int generationId, String memberId, String leaderId, Errors error) {
        return new JoinGroupResponse(error, generationId, this.partitionAssignor.name(), memberId, leaderId, Collections.emptyMap());
    }

    private SyncGroupResponse syncGroupResponse(List<TopicPartition> partitions, Errors error) {
        ByteBuffer buf = ConsumerProtocol.serializeAssignment((PartitionAssignor.Assignment)new PartitionAssignor.Assignment(partitions));
        return new SyncGroupResponse(error, buf);
    }

    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Errors> responseData) {
        return new OffsetCommitResponse(responseData);
    }

    private OffsetFetchResponse offsetFetchResponse(Errors topLevelError) {
        return new OffsetFetchResponse(topLevelError, Collections.emptyMap());
    }

    private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError, String metadata, long offset) {
        OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, partitionLevelError);
        return new OffsetFetchResponse(Errors.NONE, Collections.singletonMap(tp, data));
    }

    private OffsetCommitCallback callback(final AtomicBoolean success) {
        return new OffsetCommitCallback(){

            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception == null) {
                    success.set(true);
                }
            }
        };
    }

    private static class MockRebalanceListener
    implements ConsumerRebalanceListener {
        public Collection<TopicPartition> revoked;
        public Collection<TopicPartition> assigned;
        public int revokedCount = 0;
        public int assignedCount = 0;

        private MockRebalanceListener() {
        }

        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            this.assigned = partitions;
            ++this.assignedCount;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            this.revoked = partitions;
            ++this.revokedCount;
        }
    }

    private static class MockCommitCallback
    implements OffsetCommitCallback {
        public int invoked = 0;
        public Exception exception = null;

        private MockCommitCallback() {
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            ++this.invoked;
            this.exception = exception;
        }
    }
}

