/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.consumer.AcknowledgeType;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.internals.Acknowledgements;
import org.apache.kafka.clients.consumer.internals.AutoOffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.Deserializers;
import org.apache.kafka.clients.consumer.internals.FetchConfig;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.NodeAcknowledgements;
import org.apache.kafka.clients.consumer.internals.ShareCompletedFetchTest;
import org.apache.kafka.clients.consumer.internals.ShareConsumeRequestManager;
import org.apache.kafka.clients.consumer.internals.ShareFetch;
import org.apache.kafka.clients.consumer.internals.ShareFetchBuffer;
import org.apache.kafka.clients.consumer.internals.ShareFetchCollector;
import org.apache.kafka.clients.consumer.internals.ShareFetchMetricsManager;
import org.apache.kafka.clients.consumer.internals.ShareFetchMetricsRegistry;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.ShareAcknowledgementCommitCallbackEvent;
import org.apache.kafka.clients.consumer.internals.metrics.AsyncConsumerMetrics;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.InvalidRecordStateException;
import org.apache.kafka.common.errors.ShareSessionNotFoundException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.RequestHeaderData;
import org.apache.kafka.common.message.ShareAcknowledgeResponseData;
import org.apache.kafka.common.message.ShareFetchRequestData;
import org.apache.kafka.common.message.ShareFetchResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.BaseRecords;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.ShareAcknowledgeResponse;
import org.apache.kafka.common.requests.ShareFetchRequest;
import org.apache.kafka.common.requests.ShareFetchResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ShareConsumeRequestManagerTest {
    private final String topicName = "test";
    private final String topicName2 = "test-2";
    private final String groupId = "test-group";
    private final Uuid topicId = Uuid.randomUuid();
    private final Uuid topicId2 = Uuid.randomUuid();
    private final Map<String, Uuid> topicIds = new HashMap<String, Uuid>(){
        {
            this.put("test", ShareConsumeRequestManagerTest.this.topicId);
            this.put("test-2", ShareConsumeRequestManagerTest.this.topicId2);
        }
    };
    private final Map<String, Integer> topicPartitionCounts = new HashMap<String, Integer>(){
        {
            this.put("test", 2);
            this.put("test-2", 1);
        }
    };
    private final TopicPartition tp0 = new TopicPartition("test", 0);
    private final TopicIdPartition tip0 = new TopicIdPartition(this.topicId, this.tp0);
    private final TopicPartition tp1 = new TopicPartition("test", 1);
    private final TopicIdPartition tip1 = new TopicIdPartition(this.topicId, this.tp1);
    private final TopicPartition t2p0 = new TopicPartition("test-2", 0);
    private final TopicIdPartition t2ip0 = new TopicIdPartition(this.topicId2, this.t2p0);
    private final int validLeaderEpoch = 0;
    private final MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWithIds(1, Map.of("test", 2), this.topicIds);
    private final long retryBackoffMs = 100L;
    private final long requestTimeoutMs = 30000L;
    private final long defaultApiTimeoutMs = 60000L;
    private MockTime time = new MockTime(1L);
    private SubscriptionState subscriptions;
    private ConsumerMetadata metadata;
    private ShareFetchMetricsManager metricsManager;
    private MockClient client;
    private Metrics metrics;
    private TestableShareConsumeRequestManager<?, ?> shareConsumeRequestManager;
    private TestableNetworkClientDelegate networkClientDelegate;
    private MemoryRecords records;
    private List<ShareFetchResponseData.AcquiredRecords> acquiredRecords;
    private List<ShareFetchResponseData.AcquiredRecords> emptyAcquiredRecords;
    private ShareFetchMetricsRegistry shareFetchMetricsRegistry;
    private List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements;

    @BeforeEach
    public void setup() {
        this.records = this.buildRecords(1L, 3, 1L);
        this.acquiredRecords = ShareCompletedFetchTest.acquiredRecords(1L, 3);
        this.emptyAcquiredRecords = new ArrayList<ShareFetchResponseData.AcquiredRecords>();
        this.completedAcknowledgements = new LinkedList<Map<TopicIdPartition, Acknowledgements>>();
    }

    private void assignFromSubscribed(Set<TopicPartition> partitions) {
        this.subscriptions.subscribeToShareGroup(partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet()));
        this.subscriptions.assignFromSubscribed(partitions);
        this.client.updateMetadata(this.initialUpdateResponse);
        this.metadata.updateWithCurrentRequestVersion(RequestTestUtils.metadataUpdateWithIds("kafka-cluster", 1, Collections.emptyMap(), this.topicPartitionCounts, tp -> 0, this.topicIds), false, 0L);
    }

    @AfterEach
    public void teardown() throws Exception {
        if (this.metrics != null) {
            this.metrics.close();
        }
        if (this.shareConsumeRequestManager != null) {
            this.shareConsumeRequestManager.close();
        }
    }

    private int sendFetches() {
        return this.shareConsumeRequestManager.sendFetches();
    }

    @Test
    public void testFetchNormal() {
        this.buildRequestManager();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Map partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        List records = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)3, (int)records.size());
    }

    @Test
    public void testFetchWithAcquiredRecords() {
        this.buildRequestManager();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE);
        Map partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        List fetchedRecords = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
    }

    @Test
    public void testMultipleFetches() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.NONE);
        Map partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        List fetchedRecords = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
        Acknowledgements acknowledgements = Acknowledgements.empty();
        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
        this.sendFetchAndVerifyResponse(this.records, ShareCompletedFetchTest.acquiredRecords(2L, 1), Errors.NONE);
        Assertions.assertEquals((Object)1.0, (Object)((KafkaMetric)this.metrics.metrics().get(this.metrics.metricInstance(this.shareFetchMetricsRegistry.acknowledgementSendTotal, new String[0]))).metricValue());
        partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertEquals(Map.of(this.tip0, acknowledgements), this.completedAcknowledgements.get(0));
        this.completedAcknowledgements.clear();
        Acknowledgements acknowledgements2 = Acknowledgements.empty();
        acknowledgements2.add(2L, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements2)), Collections.emptyMap());
        this.sendFetchAndVerifyResponse(this.records, Collections.emptyList(), Errors.NONE, Errors.INVALID_RECORD_STATE);
        Assertions.assertEquals((Object)2.0, (Object)((KafkaMetric)this.metrics.metrics().get(this.metrics.metricInstance(this.shareFetchMetricsRegistry.acknowledgementSendTotal, new String[0]))).metricValue());
        Assertions.assertEquals((Object)1.0, (Object)((KafkaMetric)this.metrics.metrics().get(this.metrics.metricInstance(this.shareFetchMetricsRegistry.acknowledgementErrorTotal, new String[0]))).metricValue());
        partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.isEmpty());
        Assertions.assertEquals(Map.of(this.tip0, acknowledgements2), this.completedAcknowledgements.get(0));
    }

    @Test
    public void testCommitSync() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.commitSync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(2000L)));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertEquals(Map.of(this.tip0, acknowledgements), this.completedAcknowledgements.get(0));
    }

    @Test
    public void testCommitAsync() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertEquals(Map.of(this.tip0, acknowledgements), this.completedAcknowledgements.get(0));
    }

    @Test
    public void testServerDisconnectedOnShareAcknowledge() throws InterruptedException {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        this.fetchRecords();
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        Acknowledgements acknowledgements2 = Acknowledgements.empty();
        acknowledgements2.add(3L, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements2)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        this.client.prepareResponse(null, true);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals(Map.of(this.tip0, acknowledgements), this.completedAcknowledgements.get(0));
        Assertions.assertInstanceOf(UnknownServerException.class, (Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
        this.completedAcknowledgements.clear();
        Assertions.assertEquals((int)1, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getAcknowledgementsToSendCount(this.tip0));
        TestUtils.retryOnExceptionWithTimeout(() -> {
            Assertions.assertEquals((int)0, (int)this.shareConsumeRequestManager.sendAcknowledgements());
            Assertions.assertNull(this.shareConsumeRequestManager.requestStates(0));
            Assertions.assertEquals(Map.of(this.tip0, acknowledgements2), this.completedAcknowledgements.get(0));
            Assertions.assertInstanceOf(ShareSessionNotFoundException.class, (Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
        });
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tip0, this.records, this.acquiredRecords, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
    }

    @Test
    public void testAcknowledgeOnClose() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Acknowledgements acknowledgements = Acknowledgements.empty();
        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
        Acknowledgements acknowledgements2 = this.getAcknowledgements(2, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.acknowledgeOnClose(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements2)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(100L)));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.size());
        Acknowledgements mergedAcks = acknowledgements.merge(acknowledgements2);
        mergedAcks.complete(null);
        Assertions.assertEquals((Object)mergedAcks.getAcknowledgementsTypeMap(), (Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgementsTypeMap());
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
    }

    @Test
    public void testAcknowledgeOnCloseWithPendingCommitAsync() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        this.shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(100L)));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        this.client.prepareResponse((AbstractResponse)this.emptyAcknowledgeResponse());
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertEquals(Map.of(this.tip0, acknowledgements), this.completedAcknowledgements.get(0));
    }

    @Test
    public void testAcknowledgeOnCloseWithPendingCommitSync() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.commitSync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(100L)));
        this.shareConsumeRequestManager.acknowledgeOnClose(Collections.emptyMap(), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(100L)));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        this.client.prepareResponse((AbstractResponse)this.emptyAcknowledgeResponse());
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertEquals(Map.of(this.tip0, acknowledgements), this.completedAcknowledgements.get(0));
    }

    @Test
    public void testResultHandlerOnCommitAsync() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        ShareConsumeRequestManager.ResultHandler resultHandler = this.shareConsumeRequestManager.buildResultHandler(null, Optional.empty());
        resultHandler.complete(this.tip0, null, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC);
        Assertions.assertEquals((int)0, (int)this.completedAcknowledgements.size());
        resultHandler.complete(this.tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
        Assertions.assertEquals((int)0, (int)this.completedAcknowledgements.size());
        resultHandler.complete(this.tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_ASYNC);
        Assertions.assertEquals((int)3, (int)this.completedAcknowledgements.get(0).get(this.tip0).size());
    }

    @Test
    public void testResultHandlerOnCommitSync() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        CompletableFuture future = new CompletableFuture();
        AtomicInteger resultCount = new AtomicInteger(3);
        ShareConsumeRequestManager.ResultHandler resultHandler = this.shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
        resultHandler.complete(this.tip0, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
        Assertions.assertEquals((int)0, (int)this.completedAcknowledgements.size());
        Assertions.assertFalse((boolean)future.isDone());
        resultHandler.complete(this.t2ip0, null, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
        Assertions.assertEquals((int)0, (int)this.completedAcknowledgements.size());
        Assertions.assertFalse((boolean)future.isDone());
        resultHandler.complete(this.tip1, acknowledgements, ShareConsumeRequestManager.AcknowledgeRequestType.COMMIT_SYNC);
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.size());
        Assertions.assertEquals((int)2, (int)this.completedAcknowledgements.get(0).size());
        Assertions.assertEquals((int)3, (int)this.completedAcknowledgements.get(0).get(this.tip0).size());
        Assertions.assertEquals((int)3, (int)this.completedAcknowledgements.get(0).get(this.tip1).size());
        Assertions.assertTrue((boolean)future.isDone());
    }

    @Test
    public void testResultHandlerCompleteIfEmpty() {
        this.buildRequestManager();
        CompletableFuture future = new CompletableFuture();
        AtomicInteger resultCount = new AtomicInteger(1);
        ShareConsumeRequestManager.ResultHandler resultHandler = this.shareConsumeRequestManager.buildResultHandler(resultCount, Optional.of(future));
        resultHandler.completeIfEmpty();
        Assertions.assertFalse((boolean)future.isDone());
        resultCount.decrementAndGet();
        resultHandler.completeIfEmpty();
        Assertions.assertTrue((boolean)future.isDone());
    }

    @Test
    public void testBatchingAcknowledgeRequestStates() {
        this.buildRequestManager();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.buildRecords(1L, 6, 1L), ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        Acknowledgements acknowledgements2 = this.getAcknowledgements(4, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements2)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        Assertions.assertEquals((int)6, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getAcknowledgementsToSendCount(this.tip0));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        Assertions.assertEquals((int)0, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getAcknowledgementsToSendCount(this.tip0));
        Assertions.assertEquals((int)6, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getInFlightAcknowledgementsCount(this.tip0));
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)0, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getAcknowledgementsToSendCount(this.tip0));
        Assertions.assertEquals((int)0, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getInFlightAcknowledgementsCount(this.tip0));
    }

    @Test
    public void testPendingCommitAsyncBeforeCommitSync() {
        this.buildRequestManager();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.buildRecords(1L, 6, 1L), ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        Acknowledgements acknowledgements2 = this.getAcknowledgements(4, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.commitSync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements2)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        Assertions.assertEquals((int)3, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getAcknowledgementsToSendCount(this.tip0));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
        Assertions.assertEquals((int)3, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getAcknowledgementsToSendCount(this.tip0));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        Assertions.assertEquals((int)3, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getInFlightAcknowledgementsCount(this.tip0));
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)0, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getInFlightAcknowledgementsCount(this.tip0));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
        Assertions.assertEquals((int)3, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getAcknowledgementsToSendCount(this.tip0));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
        Assertions.assertEquals((int)3, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getInFlightAcknowledgementsCount(this.tip0));
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)0, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getInFlightAcknowledgementsCount(this.tip0));
    }

    @Test
    public void testRetryAcknowledgements() throws InterruptedException {
        this.buildRequestManager();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.buildRecords(1L, 6, 1L), ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT, AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.commitSync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), 60000L);
        Assertions.assertNull((Object)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest());
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
        Assertions.assertEquals((int)6, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getAcknowledgementsToSendCount(this.tip0));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        Assertions.assertEquals((int)6, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getInFlightAcknowledgementsCount(this.tip0));
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.REQUEST_TIMED_OUT));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)6, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getIncompleteAcknowledgementsCount(this.tip0));
        Assertions.assertEquals((int)0, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getInFlightAcknowledgementsCount(this.tip0));
        TestUtils.retryOnExceptionWithTimeout(() -> Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements()));
        Assertions.assertEquals((int)6, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getInFlightAcknowledgementsCount(this.tip0));
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)0, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getInFlightAcknowledgementsCount(this.tip0));
        Assertions.assertEquals((int)0, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getIncompleteAcknowledgementsCount(this.tip0));
    }

    @ParameterizedTest
    @EnumSource(value=Errors.class, names={"FENCED_LEADER_EPOCH", "NOT_LEADER_OR_FOLLOWER", "UNKNOWN_TOPIC_OR_PARTITION"})
    public void testFatalErrorsAcknowledgementResponse(Errors error) {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, error));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)0, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getInFlightAcknowledgementsCount(this.tip0));
        Assertions.assertEquals((int)0, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getIncompleteAcknowledgementsCount(this.tip0));
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.size());
        Assertions.assertEquals((int)3, (int)this.completedAcknowledgements.get(0).get(this.tip0).size());
    }

    @Test
    public void testRetryAcknowledgementsMultipleCommitAsync() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.buildRecords(1L, 6, 1L), ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Time)this.time, (long)1000L));
        Assertions.assertEquals((int)2, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getAcknowledgementsToSendCount(this.tip0));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        Assertions.assertEquals((int)2, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getInFlightAcknowledgementsCount(this.tip0));
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.REQUEST_TIMED_OUT));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Acknowledgements acknowledgements1 = this.getAcknowledgements(3, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements1)), CompletableEvent.calculateDeadlineMs((Time)this.time, (long)1000L));
        Assertions.assertEquals((int)2, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getIncompleteAcknowledgementsCount(this.tip0));
        Acknowledgements acknowledgements2 = this.getAcknowledgements(5, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements2)), CompletableEvent.calculateDeadlineMs((Time)this.time, (long)1000L));
        this.time.sleep(2000L);
        Assertions.assertEquals((int)0, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.size());
        Assertions.assertEquals((int)2, (int)this.completedAcknowledgements.get(0).get(this.tip0).size());
        Assertions.assertEquals((Object)Errors.REQUEST_TIMED_OUT.exception(), (Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
        this.completedAcknowledgements.clear();
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        Assertions.assertEquals((int)4, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getInFlightAcknowledgementsCount(this.tip0));
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)0, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getInFlightAcknowledgementsCount(this.tip0));
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.size());
        Assertions.assertEquals((int)4, (int)this.completedAcknowledgements.get(0).get(this.tip0).size());
        Assertions.assertNull((Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
    }

    @Test
    public void testRetryAcknowledgementsMultipleCommitSync() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.buildRecords(1L, 6, 1L), ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.commitSync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Time)this.time, (long)1000L));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.REQUEST_TIMED_OUT));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)2, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getIncompleteAcknowledgementsCount(this.tip0));
        this.time.sleep(2000L);
        Acknowledgements acknowledgements1 = this.getAcknowledgements(3, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.commitSync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements1)), CompletableEvent.calculateDeadlineMs((Time)this.time, (long)1000L));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        Assertions.assertEquals((int)2, (int)this.completedAcknowledgements.get(0).get(this.tip0).size());
        Assertions.assertEquals((Object)Errors.REQUEST_TIMED_OUT.exception(), (Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
        this.completedAcknowledgements.clear();
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.size());
        Assertions.assertEquals((int)4, (int)this.completedAcknowledgements.get(0).get(this.tip0).size());
        Assertions.assertNull((Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
    }

    @Test
    public void testPiggybackAcknowledgementsInFlight() {
        this.buildRequestManager();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
        this.fetchRecords();
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertEquals((Object)2.0, (Object)((KafkaMetric)this.metrics.metrics().get(this.metrics.metricInstance(this.shareFetchMetricsRegistry.acknowledgementSendTotal, new String[0]))).metricValue());
        Acknowledgements acknowledgements2 = Acknowledgements.empty();
        acknowledgements2.add(3L, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements2)), Collections.emptyMap());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tip0, this.records, this.acquiredRecords, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        this.fetchRecords();
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertEquals((Object)3.0, (Object)((KafkaMetric)this.metrics.metrics().get(this.metrics.metricInstance(this.shareFetchMetricsRegistry.acknowledgementSendTotal, new String[0]))).metricValue());
    }

    @Test
    public void testCommitAsyncWithSubscriptionChange() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test-2"));
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.t2p0));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Map.of("test-2", 1), tp -> 0, this.topicIds, false));
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)3, (int)this.completedAcknowledgements.get(0).get(this.tip0).size());
        Assertions.assertNull((Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.t2ip0, this.records, this.acquiredRecords, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
    }

    @Test
    public void testCommitSyncWithSubscriptionChange() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test-2"));
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.t2p0));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Map.of("test-2", 1), tp -> 0, this.topicIds, false));
        this.shareConsumeRequestManager.commitSync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(100L)));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)3, (int)this.completedAcknowledgements.get(0).get(this.tip0).size());
        Assertions.assertNull((Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.t2ip0, this.records, this.acquiredRecords, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
    }

    @Test
    public void testCloseWithSubscriptionChange() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test-2"));
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.t2p0));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Map.of("test-2", 1), tp -> 0, this.topicIds, false));
        this.shareConsumeRequestManager.acknowledgeOnClose(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(100L)));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)3, (int)this.completedAcknowledgements.get(0).get(this.tip0).size());
        Assertions.assertNull((Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
        Assertions.assertEquals((int)0, (int)this.sendFetches());
    }

    @Test
    public void testShareFetchWithSubscriptionChange() {
        this.buildRequestManager();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
        this.fetchRecords();
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test-2"));
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.t2p0));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Map.of("test-2", 1), tp -> 0, this.topicIds, false));
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertEquals((Object)3.0, (Object)((KafkaMetric)this.metrics.metrics().get(this.metrics.metricInstance(this.shareFetchMetricsRegistry.acknowledgementSendTotal, new String[0]))).metricValue());
    }

    @Test
    public void testShareFetchWithSubscriptionChangeMultipleNodes() {
        ShareFetchRequest.Builder builder2;
        ShareFetchRequest.Builder builder1;
        this.buildRequestManager();
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test"));
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.tp0));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, Map.of("test", 2), tp -> 0, this.topicIds, false));
        Node nodeId0 = this.metadata.fetch().nodeById(0);
        Node nodeId1 = this.metadata.fetch().nodeById(1);
        Node tp0Leader = this.metadata.fetch().leaderFor(this.tp0);
        Node tp1Leader = this.metadata.fetch().leaderFor(this.tp1);
        Assertions.assertEquals((Object)nodeId0, (Object)tp0Leader);
        Assertions.assertEquals((Object)nodeId1, (Object)tp1Leader);
        this.sendFetchAndVerifyResponse(this.records, this.emptyAcquiredRecords, Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(0, AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
        this.fetchRecords();
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.tp1));
        NetworkClientDelegate.PollResult pollResult = this.shareConsumeRequestManager.sendFetchesReturnPollResult();
        Assertions.assertEquals((int)2, (int)pollResult.unsentRequests.size());
        if (((NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(0)).node().get() == nodeId0) {
            builder1 = (ShareFetchRequest.Builder)((NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(0)).requestBuilder();
            builder2 = (ShareFetchRequest.Builder)((NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(1)).requestBuilder();
            Assertions.assertEquals((Object)nodeId1, ((NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(1)).node().get());
        } else {
            builder1 = (ShareFetchRequest.Builder)((NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(1)).requestBuilder();
            builder2 = (ShareFetchRequest.Builder)((NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(0)).requestBuilder();
            Assertions.assertEquals((Object)nodeId0, ((NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(1)).node().get());
            Assertions.assertEquals((Object)nodeId1, ((NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(0)).node().get());
        }
        Assertions.assertEquals((int)1, (int)builder1.data().topics().size());
        ShareFetchRequestData.FetchTopic fetchTopic = (ShareFetchRequestData.FetchTopic)builder1.data().topics().stream().findFirst().get();
        Assertions.assertEquals((Object)this.tip0.topicId(), (Object)fetchTopic.topicId());
        Assertions.assertEquals((int)1, (int)fetchTopic.partitions().size());
        ShareFetchRequestData.FetchPartition fetchPartition = (ShareFetchRequestData.FetchPartition)fetchTopic.partitions().stream().findFirst().get();
        Assertions.assertEquals((int)0, (int)fetchPartition.partitionIndex());
        Assertions.assertEquals((int)1, (int)fetchPartition.acknowledgementBatches().size());
        Assertions.assertEquals((long)0L, (long)((ShareFetchRequestData.AcknowledgementBatch)fetchPartition.acknowledgementBatches().get(0)).firstOffset());
        Assertions.assertEquals((long)2L, (long)((ShareFetchRequestData.AcknowledgementBatch)fetchPartition.acknowledgementBatches().get(0)).lastOffset());
        Assertions.assertEquals((int)1, (int)builder1.data().forgottenTopicsData().size());
        Assertions.assertEquals((Object)this.tip0.topicId(), (Object)((ShareFetchRequestData.ForgottenTopic)builder1.data().forgottenTopicsData().get(0)).topicId());
        Assertions.assertEquals((int)1, (int)((ShareFetchRequestData.ForgottenTopic)builder1.data().forgottenTopicsData().get(0)).partitions().size());
        Assertions.assertEquals((int)0, (Integer)((Integer)((ShareFetchRequestData.ForgottenTopic)builder1.data().forgottenTopicsData().get(0)).partitions().get(0)));
        Assertions.assertEquals((int)1, (int)builder2.data().topics().size());
        fetchTopic = (ShareFetchRequestData.FetchTopic)builder2.data().topics().stream().findFirst().get();
        Assertions.assertEquals((Object)this.tip1.topicId(), (Object)fetchTopic.topicId());
        Assertions.assertEquals((int)1, (int)fetchTopic.partitions().size());
        Assertions.assertEquals((int)1, (int)((ShareFetchRequestData.FetchPartition)fetchTopic.partitions().stream().findFirst().get()).partitionIndex());
    }

    @Test
    public void testShareFetchWithSubscriptionChangeMultipleNodesEmptyAcknowledgements() {
        this.buildRequestManager();
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test"));
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.tp0));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, Map.of("test", 2), tp -> 0, this.topicIds, false));
        Node nodeId0 = this.metadata.fetch().nodeById(0);
        Node nodeId1 = this.metadata.fetch().nodeById(1);
        Node tp0Leader = this.metadata.fetch().leaderFor(this.tp0);
        Node tp1Leader = this.metadata.fetch().leaderFor(this.tp1);
        Assertions.assertEquals((Object)nodeId0, (Object)tp0Leader);
        Assertions.assertEquals((Object)nodeId1, (Object)tp1Leader);
        this.sendFetchAndVerifyResponse(this.records, this.emptyAcquiredRecords, Errors.NONE);
        this.fetchRecords();
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.tp1));
        NetworkClientDelegate.PollResult pollResult = this.shareConsumeRequestManager.sendFetchesReturnPollResult();
        Assertions.assertEquals((int)1, (int)pollResult.unsentRequests.size());
        Assertions.assertEquals((Object)nodeId1, ((NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(0)).node().get());
        ShareFetchRequest.Builder builder = (ShareFetchRequest.Builder)((NetworkClientDelegate.UnsentRequest)pollResult.unsentRequests.get(0)).requestBuilder();
        Assertions.assertEquals((int)1, (int)builder.data().topics().size());
        ShareFetchRequestData.FetchTopic fetchTopic = (ShareFetchRequestData.FetchTopic)builder.data().topics().stream().findFirst().get();
        Assertions.assertEquals((Object)this.tip1.topicId(), (Object)fetchTopic.topicId());
        Assertions.assertEquals((int)1, (int)fetchTopic.partitions().size());
        Assertions.assertEquals((int)1, (int)((ShareFetchRequestData.FetchPartition)fetchTopic.partitions().stream().findFirst().get()).partitionIndex());
        Assertions.assertEquals((int)0, (int)builder.data().forgottenTopicsData().size());
    }

    @Test
    public void testShareFetchAndCloseMultipleNodes() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test"));
        this.subscriptions.assignFromSubscribed(List.of(this.tp0, this.tp1));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, Map.of("test", 2), tp -> 0, this.topicIds, false));
        Assertions.assertEquals((int)2, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tip0, this.records, this.acquiredRecords, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tip1, this.records, this.acquiredRecords, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        Acknowledgements acknowledgements1 = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        HashMap<TopicIdPartition, NodeAcknowledgements> acknowledgementsMap = new HashMap<TopicIdPartition, NodeAcknowledgements>();
        acknowledgementsMap.put(this.tip0, new NodeAcknowledgements(0, acknowledgements));
        acknowledgementsMap.put(this.tip1, new NodeAcknowledgements(1, acknowledgements1));
        this.shareConsumeRequestManager.acknowledgeOnClose(acknowledgementsMap, CompletableEvent.calculateDeadlineMs((Time)this.time, (long)1000L));
        Assertions.assertEquals((int)2, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip1, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)3, (int)this.completedAcknowledgements.get(0).get(this.tip0).size());
        Assertions.assertEquals((int)3, (int)this.completedAcknowledgements.get(0).get(this.tip1).size());
        Assertions.assertEquals((int)0, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        Assertions.assertNull(this.shareConsumeRequestManager.requestStates(0));
        Assertions.assertNull(this.shareConsumeRequestManager.requestStates(1));
    }

    @Test
    public void testRetryAcknowledgementsWithLeaderChange() {
        this.buildRequestManager();
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test"));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(this.tp0);
        this.subscriptions.assignFromSubscribed(partitions);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, Map.of("test", 1), tp -> 0, this.topicIds, false));
        Node nodeId0 = this.metadata.fetch().nodeById(0);
        Node nodeId1 = this.metadata.fetch().nodeById(1);
        LinkedList<Node> nodeList = new LinkedList<Node>(Arrays.asList(nodeId0, nodeId1));
        this.sendFetchAndVerifyResponse(this.buildRecords(1L, 6, 1L), ShareCompletedFetchTest.acquiredRecords(1L, 6), Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT, AcknowledgeType.ACCEPT, AcknowledgeType.RELEASE, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.commitSync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        Assertions.assertNull((Object)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest());
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().size());
        Assertions.assertEquals((int)6, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getAcknowledgementsToSendCount(this.tip0));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        Assertions.assertEquals((int)6, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getInFlightAcknowledgementsCount(this.tip0));
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NOT_LEADER_OR_FOLLOWER, new ShareAcknowledgeResponseData.LeaderIdAndEpoch().setLeaderId(nodeId1.id()).setLeaderEpoch(1), nodeList));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)0, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getInFlightAcknowledgementsCount(this.tip0));
        Assertions.assertEquals((int)0, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getSyncRequestQueue().peek()).getIncompleteAcknowledgementsCount(this.tip0));
    }

    @Test
    public void testCallbackHandlerConfig() throws InterruptedException {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertEquals(Map.of(this.tip0, acknowledgements), this.completedAcknowledgements.get(0));
        this.completedAcknowledgements.clear();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(false);
        Acknowledgements acknowledgements2 = Acknowledgements.empty();
        acknowledgements2.add(3L, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements2)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        TestUtils.retryOnExceptionWithTimeout(() -> Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements()));
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertEquals((int)0, (int)this.completedAcknowledgements.size());
    }

    @Test
    public void testAcknowledgementCommitCallbackMultiplePartitionCommitAsync() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(this.tp0);
        partitions.add(this.t2p0);
        this.assignFromSubscribed(partitions);
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        partitionDataMap.put(this.tip0, this.partitionDataForFetch(this.tip0, this.records, this.acquiredRecords, Errors.NONE, Errors.NONE));
        partitionDataMap.put(this.t2ip0, this.partitionDataForFetch(this.t2ip0, this.records, this.acquiredRecords, Errors.NONE, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionDataMap, Collections.emptyList(), (int)0));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        Acknowledgements acknowledgements2 = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        HashMap<TopicIdPartition, NodeAcknowledgements> acks = new HashMap<TopicIdPartition, NodeAcknowledgements>();
        acks.put(this.tip0, new NodeAcknowledgements(0, acknowledgements));
        acks.put(this.t2ip0, new NodeAcknowledgements(0, acknowledgements2));
        this.shareConsumeRequestManager.commitAsync(acks, CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        HashMap<TopicIdPartition, Errors> errorsMap = new HashMap<TopicIdPartition, Errors>();
        errorsMap.put(this.tip0, Errors.NONE);
        errorsMap.put(this.t2ip0, Errors.NONE);
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(errorsMap));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertEquals((int)2, (int)this.completedAcknowledgements.size());
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.get(0).size());
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.get(1).size());
    }

    @Test
    public void testMultipleTopicsFetch() {
        this.buildRequestManager();
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(this.tp0);
        partitions.add(this.t2p0);
        this.assignFromSubscribed(partitions);
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        partitionDataMap.put(this.tip0, this.partitionDataForFetch(this.tip0, this.records, this.acquiredRecords, Errors.NONE, Errors.NONE));
        partitionDataMap.put(this.t2ip0, this.partitionDataForFetch(this.t2ip0, this.records, this.emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionDataMap, Collections.emptyList(), (int)0));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        ShareFetch shareFetch = this.collectFetch();
        Assertions.assertEquals((int)1, (int)shareFetch.records().size());
        Assertions.assertEquals((int)3, (int)((List)shareFetch.records().get(this.tp0)).size());
        Assertions.assertThrows(NullPointerException.class, (Executable)((Executable)shareFetch.records().get(this.t2p0)));
        Assertions.assertThrows(TopicAuthorizationException.class, this::collectFetch);
    }

    @Test
    public void testMultipleTopicsFetchError() {
        this.buildRequestManager();
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(this.tp0);
        partitions.add(this.t2p0);
        this.assignFromSubscribed(partitions);
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionDataMap = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        partitionDataMap.put(this.t2ip0, this.partitionDataForFetch(this.t2ip0, this.records, this.emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED, Errors.NONE));
        partitionDataMap.put(this.tip0, this.partitionDataForFetch(this.tip0, this.records, this.acquiredRecords, Errors.NONE, Errors.NONE));
        this.client.prepareResponse((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionDataMap, Collections.emptyList(), (int)0));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertThrows(TopicAuthorizationException.class, this::collectFetch);
        ShareFetch shareFetch = this.collectFetch();
        Assertions.assertEquals((int)1, (int)shareFetch.records().size());
        Assertions.assertEquals((int)3, (int)((List)shareFetch.records().get(this.tp0)).size());
        Assertions.assertThrows(NullPointerException.class, (Executable)((Executable)shareFetch.records().get(this.t2p0)));
    }

    @Test
    public void testShareFetchInvalidResponse() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test"));
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Map.of("test", 1), tp -> 0, this.topicIds, false));
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.t2ip0, this.records, this.acquiredRecords, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
    }

    @Test
    public void testShareAcknowledgeInvalidResponse() throws InterruptedException {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test"));
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp0));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(1, Map.of("test", 1), tp -> 0, this.topicIds, false));
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        this.fetchRecords();
        Acknowledgements acknowledgements = Acknowledgements.empty();
        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.commitAsync(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.acknowledgeResponseWithTopLevelError(this.t2ip0, Errors.LEADER_NOT_AVAILABLE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)1, (int)((ShareConsumeRequestManager.AcknowledgeRequestState)this.shareConsumeRequestManager.requestStates(0).getAsyncRequest()).getIncompleteAcknowledgementsCount(this.tip0));
        TestUtils.retryOnExceptionWithTimeout(() -> Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements()));
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.t2ip0, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals(InvalidRecordStateException.class, ((Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException())).getClass());
        this.completedAcknowledgements.clear();
        Acknowledgements acknowledgements1 = this.getAcknowledgements(2, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements1)), Collections.emptyMap());
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.t2ip0, this.records, this.acquiredRecords, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertEquals(InvalidRecordStateException.class, ((Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException())).getClass());
    }

    @Test
    public void testCloseShouldBeIdempotent() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.close();
        this.shareConsumeRequestManager.close();
        this.shareConsumeRequestManager.close();
        ((TestableShareConsumeRequestManager)((Object)Mockito.verify(this.shareConsumeRequestManager, (VerificationMode)Mockito.times((int)1)))).closeInternal();
    }

    @Test
    public void testFetchError() {
        this.buildRequestManager();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.sendFetchAndVerifyResponse(this.records, this.emptyAcquiredRecords, Errors.NOT_LEADER_OR_FOLLOWER);
        Map partitionRecords = this.fetchRecords();
        Assertions.assertFalse((boolean)partitionRecords.containsKey(this.tp0));
    }

    @Test
    public void testPiggybackAcknowledgementsOnInitialShareSessionError() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        Acknowledgements acknowledgements = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT, AcknowledgeType.REJECT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
        this.sendFetchAndVerifyResponse(this.records, this.acquiredRecords, Errors.NONE);
        Assertions.assertEquals((int)3, (int)this.completedAcknowledgements.get(0).get(this.tip0).size());
        Assertions.assertEquals((Object)Errors.INVALID_SHARE_SESSION_EPOCH.exception(), (Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
    }

    @Test
    public void testInvalidDefaultRecordBatch() {
        this.buildRequestManager();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
        MemoryRecordsBuilder builder = new MemoryRecordsBuilder(out, 2, (Compression)Compression.NONE, TimestampType.CREATE_TIME, 0L, 10L, 0L, 0, 0, false, false, 0, 1024);
        builder.append(10L, "key".getBytes(), "value".getBytes());
        builder.close();
        buffer.flip();
        buffer.position(17);
        buffer.put("beef".getBytes());
        buffer.position(0);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tip0, MemoryRecords.readableRecords((ByteBuffer)buffer), ShareCompletedFetchTest.acquiredRecords(0L, 1), Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertThrows(KafkaException.class, this::collectFetch);
        ShareFetch fetch = this.collectFetch();
        Assertions.assertTrue((boolean)fetch.isEmpty());
    }

    @Test
    public void testParseInvalidRecordBatch() {
        this.buildRequestManager();
        MemoryRecords records = MemoryRecords.withRecords((byte)2, (long)0L, (Compression)Compression.NONE, (TimestampType)TimestampType.CREATE_TIME, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord(1L, "a".getBytes(), "1".getBytes()), new SimpleRecord(2L, "b".getBytes(), "2".getBytes()), new SimpleRecord(3L, "c".getBytes(), "3".getBytes())});
        ByteBuffer buffer = records.buffer();
        buffer.putInt(32, buffer.get(32) ^ 0x5332717);
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tip0, MemoryRecords.readableRecords((ByteBuffer)buffer), ShareCompletedFetchTest.acquiredRecords(0L, 3), Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertThrows(KafkaException.class, this::collectFetch);
    }

    @Test
    public void testHeaders() {
        this.buildRequestManager();
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (Compression)Compression.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)1L);
        builder.append(0L, "key".getBytes(), "value-1".getBytes());
        Header[] headersArray = new Header[]{new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8))};
        builder.append(0L, "key".getBytes(), "value-2".getBytes(), headersArray);
        Header[] headersArray2 = new Header[]{new RecordHeader("headerKey", "headerValue".getBytes(StandardCharsets.UTF_8)), new RecordHeader("headerKey", "headerValue2".getBytes(StandardCharsets.UTF_8))};
        builder.append(0L, "key".getBytes(), "value-3".getBytes(), headersArray2);
        MemoryRecords memoryRecords = builder.build();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tip0, memoryRecords, ShareCompletedFetchTest.acquiredRecords(1L, 3), Errors.NONE));
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        this.networkClientDelegate.poll(this.time.timer(0L));
        Map recordsByPartition = this.fetchRecords();
        List records = recordsByPartition.get(this.tp0);
        Assertions.assertEquals((int)3, (int)records.size());
        Iterator recordIterator = records.iterator();
        ConsumerRecord record = recordIterator.next();
        Assertions.assertNull((Object)record.headers().lastHeader("headerKey"));
        record = recordIterator.next();
        Assertions.assertEquals((Object)"headerValue", (Object)new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
        Assertions.assertEquals((Object)"headerKey", (Object)record.headers().lastHeader("headerKey").key());
        record = recordIterator.next();
        Assertions.assertEquals((Object)"headerValue2", (Object)new String(record.headers().lastHeader("headerKey").value(), StandardCharsets.UTF_8));
        Assertions.assertEquals((Object)"headerKey", (Object)record.headers().lastHeader("headerKey").key());
    }

    @Test
    public void testUnauthorizedTopic() {
        this.buildRequestManager();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tip0, this.records, this.emptyAcquiredRecords, Errors.TOPIC_AUTHORIZATION_FAILED));
        this.networkClientDelegate.poll(this.time.timer(0L));
        try {
            this.collectFetch();
            Assertions.fail((String)"collectFetch should have thrown a TopicAuthorizationException");
        }
        catch (TopicAuthorizationException e) {
            Assertions.assertEquals(Collections.singleton("test"), (Object)e.unauthorizedTopics());
        }
    }

    @Test
    public void testUnknownTopicIdError() {
        this.buildRequestManager();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponseWithTopLevelError(this.tip0, Errors.UNKNOWN_TOPIC_ID));
        this.networkClientDelegate.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records on fetch error");
        Assertions.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @ParameterizedTest
    @MethodSource(value={"handleFetchResponseErrorSupplier"})
    public void testHandleFetchResponseError(Errors error, boolean hasTopLevelError, boolean shouldRequestMetadataUpdate) {
        this.buildRequestManager();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        ShareFetchResponse fetchResponse = hasTopLevelError ? this.fetchResponseWithTopLevelError(this.tip0, error) : this.fullFetchResponse(this.tip0, this.records, this.emptyAcquiredRecords, error);
        this.client.prepareResponse((AbstractResponse)fetchResponse);
        this.networkClientDelegate.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records on fetch error");
        long timeToNextUpdate = this.metadata.timeToNextUpdate(this.time.milliseconds());
        if (shouldRequestMetadataUpdate) {
            Assertions.assertEquals((long)0L, (long)timeToNextUpdate, (String)"Should have requested metadata update");
        } else {
            Assertions.assertNotEquals((long)0L, (long)timeToNextUpdate, (String)"Should not have requested metadata update");
        }
    }

    private static Stream<Arguments> handleFetchResponseErrorSupplier() {
        return Stream.of(Arguments.of((Object[])new Object[]{Errors.NOT_LEADER_OR_FOLLOWER, false, true}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_TOPIC_OR_PARTITION, false, true}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_TOPIC_ID, true, true}), Arguments.of((Object[])new Object[]{Errors.INCONSISTENT_TOPIC_ID, false, true}), Arguments.of((Object[])new Object[]{Errors.FENCED_LEADER_EPOCH, false, true}), Arguments.of((Object[])new Object[]{Errors.UNKNOWN_LEADER_EPOCH, false, false}));
    }

    @Test
    public void testFetchDisconnected() {
        this.buildRequestManager();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tip0, this.records, this.acquiredRecords, Errors.NONE), true);
        this.networkClientDelegate.poll(this.time.timer(0L));
        this.assertEmptyFetch("Should not return records on disconnect");
    }

    @Test
    public void testFetchWithLastRecordMissingFromBatch() {
        this.buildRequestManager();
        MemoryRecords records = MemoryRecords.withRecords((Compression)Compression.NONE, (SimpleRecord[])new SimpleRecord[]{new SimpleRecord("0".getBytes(), "v".getBytes()), new SimpleRecord("1".getBytes(), "v".getBytes()), new SimpleRecord("2".getBytes(), "v".getBytes()), new SimpleRecord(null, "value".getBytes())});
        MemoryRecords.FilterResult result = records.filterTo(new MemoryRecords.RecordFilter(0L, 0L){

            protected MemoryRecords.RecordFilter.BatchRetentionResult checkBatchRetention(RecordBatch batch) {
                return new MemoryRecords.RecordFilter.BatchRetentionResult(MemoryRecords.RecordFilter.BatchRetention.DELETE_EMPTY, false);
            }

            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
                return record.key() != null;
            }
        }, ByteBuffer.allocate(1024), BufferSupplier.NO_CACHING);
        result.outputBuffer().flip();
        MemoryRecords compactedRecords = MemoryRecords.readableRecords((ByteBuffer)result.outputBuffer());
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tip0, compactedRecords, ShareCompletedFetchTest.acquiredRecords(0L, 3), Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Map allFetchedRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)allFetchedRecords.containsKey(this.tp0));
        List fetchedRecords = allFetchedRecords.get(this.tp0);
        Assertions.assertEquals((int)3, (int)fetchedRecords.size());
        for (int i = 0; i < 3; ++i) {
            Assertions.assertEquals((Object)Integer.toString(i), (Object)new String((byte[])fetchedRecords.get(i).key()));
        }
    }

    private MemoryRecords buildRecords(long baseOffset, int count, long firstMessageId) {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (Compression)Compression.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)baseOffset);
        for (int i = 0; i < count; ++i) {
            builder.append(0L, "key".getBytes(), ("value-" + (firstMessageId + (long)i)).getBytes());
        }
        return builder.build();
    }

    @Test
    public void testCorruptMessageError() {
        this.buildRequestManager();
        this.assignFromSubscribed(Collections.singleton(this.tp0));
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tip0, this.buildRecords(1L, 1, 1L), ShareCompletedFetchTest.acquiredRecords(1L, 1), Errors.CORRUPT_MESSAGE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertThrows(KafkaException.class, this::fetchRecords);
    }

    @ParameterizedTest
    @EnumSource(value=Errors.class, names={"FENCED_LEADER_EPOCH", "NOT_LEADER_OR_FOLLOWER"})
    public void testWhenShareFetchResponseReturnsALeadershipChangeErrorButNoNewLeaderInformation(Errors error) {
        this.buildRequestManager();
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test"));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(this.tp0);
        partitions.add(this.tp1);
        this.subscriptions.assignFromSubscribed(partitions);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, Map.of("test", 2), tp -> 0, this.topicIds, false));
        Node nodeId0 = this.metadata.fetch().nodeById(0);
        Node nodeId1 = this.metadata.fetch().nodeById(1);
        Node tp0Leader = this.metadata.fetch().leaderFor(this.tp0);
        Node tp1Leader = this.metadata.fetch().leaderFor(this.tp1);
        Cluster startingClusterMetadata = this.metadata.fetch();
        Assertions.assertFalse((boolean)this.metadata.updateRequested());
        Assertions.assertEquals((int)2, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionData = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        partitionData.put(this.tip0, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip0.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId0);
        partitionData.clear();
        partitionData.put(this.tip1, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip1.topicPartition().partition()).setErrorCode(error.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId1);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Map partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertFalse((boolean)partitionRecords.containsKey(this.tp1));
        List fetchedRecords = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
        Acknowledgements acknowledgements = Acknowledgements.empty();
        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
        Assertions.assertEquals((Object)startingClusterMetadata, (Object)this.metadata.fetch());
        Assertions.assertTrue((boolean)this.metadata.updateRequested());
        HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders = new HashMap<TopicPartition, Metadata.LeaderIdAndEpoch>();
        partitionLeaders.put(this.tp1, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId0.id()), Optional.of(1)));
        LinkedList<Node> leaderNodes = new LinkedList<Node>(Arrays.asList(tp0Leader, tp1Leader));
        this.metadata.updatePartitionLeadership(partitionLeaders, leaderNodes);
        Assertions.assertNotEquals((Object)startingClusterMetadata, (Object)this.metadata.fetch());
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        partitionData.clear();
        partitionData.put(this.tip0, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip0.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(2L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        partitionData.put(this.tip1, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip1.topicPartition().partition()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId0);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp1));
        fetchedRecords = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
        fetchedRecords = partitionRecords.get(this.tp1);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
    }

    @ParameterizedTest
    @EnumSource(value=Errors.class, names={"FENCED_LEADER_EPOCH", "NOT_LEADER_OR_FOLLOWER"})
    public void testWhenFetchResponseReturnsWithALeadershipChangeErrorAndNewLeaderInformation(Errors error) {
        this.buildRequestManager();
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test"));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(this.tp0);
        partitions.add(this.tp1);
        this.subscriptions.assignFromSubscribed(partitions);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, Map.of("test", 2), tp -> 0, this.topicIds, false));
        Node nodeId0 = this.metadata.fetch().nodeById(0);
        Node nodeId1 = this.metadata.fetch().nodeById(1);
        Node tp0Leader = this.metadata.fetch().leaderFor(this.tp0);
        Cluster startingClusterMetadata = this.metadata.fetch();
        Assertions.assertFalse((boolean)this.metadata.updateRequested());
        Assertions.assertEquals((int)2, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionData = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        partitionData.put(this.tip0, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip0.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId0);
        partitionData.clear();
        partitionData.put(this.tip1, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip1.topicPartition().partition()).setErrorCode(error.code()).setCurrentLeader(new ShareFetchResponseData.LeaderIdAndEpoch().setLeaderId(tp0Leader.id()).setLeaderEpoch(1)));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.singletonList(tp0Leader), (int)0), nodeId1);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Map partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertFalse((boolean)partitionRecords.containsKey(this.tp1));
        List fetchedRecords = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
        Acknowledgements acknowledgements = Acknowledgements.empty();
        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
        Assertions.assertNotEquals((Object)startingClusterMetadata, (Object)this.metadata.fetch());
        Assertions.assertTrue((boolean)this.metadata.updateRequested());
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        partitionData.clear();
        partitionData.put(this.tip0, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip0.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(2L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        partitionData.put(this.tip1, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip1.topicPartition().partition()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId0);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp1));
        fetchedRecords = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
        fetchedRecords = partitionRecords.get(this.tp1);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
    }

    @ParameterizedTest
    @EnumSource(value=Errors.class, names={"FENCED_LEADER_EPOCH", "NOT_LEADER_OR_FOLLOWER"})
    public void testWhenLeadershipChangeBetweenShareFetchRequests(Errors error) {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test"));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(this.tp0);
        partitions.add(this.tp1);
        this.subscriptions.assignFromSubscribed(partitions);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, Map.of("test", 2), tp -> 0, this.topicIds, false));
        Node nodeId0 = this.metadata.fetch().nodeById(0);
        Node nodeId1 = this.metadata.fetch().nodeById(1);
        Cluster startingClusterMetadata = this.metadata.fetch();
        Assertions.assertFalse((boolean)this.metadata.updateRequested());
        Assertions.assertEquals((int)2, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionData = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        partitionData.put(this.tip0, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip0.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId0);
        partitionData.clear();
        partitionData.put(this.tip1, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip1.topicPartition().partition()).setErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId1);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Map partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertFalse((boolean)partitionRecords.containsKey(this.tp1));
        List fetchedRecords = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
        Acknowledgements acknowledgements = Acknowledgements.empty();
        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
        Assertions.assertEquals((Object)startingClusterMetadata, (Object)this.metadata.fetch());
        HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders = new HashMap<TopicPartition, Metadata.LeaderIdAndEpoch>();
        partitionLeaders.put(this.tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(1)));
        this.metadata.updatePartitionLeadership(partitionLeaders, List.of());
        Assertions.assertNotEquals((Object)startingClusterMetadata, (Object)this.metadata.fetch());
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertEquals((Object)acknowledgements, (Object)this.completedAcknowledgements.get(0).get(this.tip0));
        Assertions.assertEquals((Object)Errors.NOT_LEADER_OR_FOLLOWER.exception(), (Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
        partitionData.clear();
        partitionData.put(this.tip0, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip0.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setAcknowledgeErrorCode(error.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId0);
        partitionData.clear();
        partitionData.put(this.tip0, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip0.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        partitionData.put(this.tip1, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip1.topicPartition().partition()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId1);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp1));
        fetchedRecords = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
        fetchedRecords = partitionRecords.get(this.tp1);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
    }

    @Test
    void testLeadershipChangeAfterFetchBeforeCommitAsync() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test"));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(this.tp0);
        partitions.add(this.tp1);
        this.subscriptions.assignFromSubscribed(partitions);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, Map.of("test", 2), tp -> 0, this.topicIds, false));
        Node nodeId0 = this.metadata.fetch().nodeById(0);
        Node nodeId1 = this.metadata.fetch().nodeById(1);
        Cluster startingClusterMetadata = this.metadata.fetch();
        Assertions.assertFalse((boolean)this.metadata.updateRequested());
        Assertions.assertEquals((int)2, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionData = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        partitionData.put(this.tip0, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip0.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId0);
        partitionData.clear();
        partitionData.put(this.tip1, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip1.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId1);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Map partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp1));
        List fetchedRecords = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
        fetchedRecords = partitionRecords.get(this.tp1);
        Assertions.assertEquals((int)2, (int)fetchedRecords.size());
        Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
        acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
        Acknowledgements acknowledgementsTp1 = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
        HashMap<TopicIdPartition, NodeAcknowledgements> commitAcks = new HashMap<TopicIdPartition, NodeAcknowledgements>();
        commitAcks.put(this.tip0, new NodeAcknowledgements(0, acknowledgementsTp0));
        commitAcks.put(this.tip1, new NodeAcknowledgements(1, acknowledgementsTp1));
        HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders = new HashMap<TopicPartition, Metadata.LeaderIdAndEpoch>();
        partitionLeaders.put(this.tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(1)));
        this.metadata.updatePartitionLeadership(partitionLeaders, List.of());
        Assertions.assertNotEquals((Object)startingClusterMetadata, (Object)this.metadata.fetch());
        this.shareConsumeRequestManager.commitAsync(commitAcks, CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(60000L)));
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.get(0).size());
        Assertions.assertEquals((Object)acknowledgementsTp0, (Object)this.completedAcknowledgements.get(0).get(this.tip0));
        Assertions.assertEquals((Object)Errors.NOT_LEADER_OR_FOLLOWER.exception(), (Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip1, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.get(1).size());
        Assertions.assertEquals((Object)acknowledgementsTp1, (Object)this.completedAcknowledgements.get(1).get(this.tip1));
        Assertions.assertNull((Object)((Object)this.completedAcknowledgements.get(1).get(this.tip1).getAcknowledgeException()));
    }

    @Test
    void testLeadershipChangeAfterFetchBeforeCommitSync() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test"));
        this.subscriptions.assignFromSubscribed(List.of(this.tp0, this.tp1));
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, Map.of("test", 2), tp -> 0, this.topicIds, false));
        Node nodeId0 = this.metadata.fetch().nodeById(0);
        Node nodeId1 = this.metadata.fetch().nodeById(1);
        Cluster startingClusterMetadata = this.metadata.fetch();
        Assertions.assertFalse((boolean)this.metadata.updateRequested());
        Assertions.assertEquals((int)2, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionData = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        partitionData.put(this.tip0, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip0.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId0);
        partitionData.clear();
        partitionData.put(this.tip1, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip1.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId1);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Map partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp1));
        List fetchedRecords = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
        fetchedRecords = partitionRecords.get(this.tp1);
        Assertions.assertEquals((int)2, (int)fetchedRecords.size());
        Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
        acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
        Acknowledgements acknowledgementsTp1 = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
        HashMap<TopicIdPartition, NodeAcknowledgements> commitAcks = new HashMap<TopicIdPartition, NodeAcknowledgements>();
        commitAcks.put(this.tip0, new NodeAcknowledgements(0, acknowledgementsTp0));
        commitAcks.put(this.tip1, new NodeAcknowledgements(1, acknowledgementsTp1));
        HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders = new HashMap<TopicPartition, Metadata.LeaderIdAndEpoch>();
        partitionLeaders.put(this.tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(1)));
        this.metadata.updatePartitionLeadership(partitionLeaders, List.of());
        Assertions.assertNotEquals((Object)startingClusterMetadata, (Object)this.metadata.fetch());
        this.shareConsumeRequestManager.commitSync(commitAcks, CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(100L)));
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.get(0).size());
        Assertions.assertEquals((Object)acknowledgementsTp0, (Object)this.completedAcknowledgements.get(0).get(this.tip0));
        Assertions.assertEquals((Object)Errors.NOT_LEADER_OR_FOLLOWER.exception(), (Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
        Assertions.assertEquals((int)1, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponse((AbstractResponse)this.fullAcknowledgeResponse(this.tip1, Errors.NONE));
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.get(1).size());
        Assertions.assertEquals((Object)acknowledgementsTp1, (Object)this.completedAcknowledgements.get(1).get(this.tip1));
        Assertions.assertNull((Object)((Object)this.completedAcknowledgements.get(1).get(this.tip1).getAcknowledgeException()));
    }

    @Test
    void testLeadershipChangeAfterFetchBeforeClose() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test"));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(this.tp0);
        partitions.add(this.tp1);
        this.subscriptions.assignFromSubscribed(partitions);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, Map.of("test", 2), tp -> 0, this.topicIds, false));
        Node nodeId0 = this.metadata.fetch().nodeById(0);
        Node nodeId1 = this.metadata.fetch().nodeById(1);
        Cluster startingClusterMetadata = this.metadata.fetch();
        Assertions.assertFalse((boolean)this.metadata.updateRequested());
        Assertions.assertEquals((int)2, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionData = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        partitionData.put(this.tip0, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip0.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId0);
        partitionData.clear();
        partitionData.put(this.tip1, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip1.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 2)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId1);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Map partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp1));
        List fetchedRecords = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
        fetchedRecords = partitionRecords.get(this.tp1);
        Assertions.assertEquals((int)2, (int)fetchedRecords.size());
        Acknowledgements acknowledgementsTp0 = Acknowledgements.empty();
        acknowledgementsTp0.add(1L, AcknowledgeType.ACCEPT);
        Acknowledgements acknowledgementsTp1 = this.getAcknowledgements(1, AcknowledgeType.ACCEPT, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip1, new NodeAcknowledgements(1, acknowledgementsTp1)), Collections.emptyMap());
        HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders = new HashMap<TopicPartition, Metadata.LeaderIdAndEpoch>();
        partitionLeaders.put(this.tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(1)));
        this.metadata.updatePartitionLeadership(partitionLeaders, List.of());
        Assertions.assertNotEquals((Object)startingClusterMetadata, (Object)this.metadata.fetch());
        this.shareConsumeRequestManager.acknowledgeOnClose(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgementsTp0)), CompletableEvent.calculateDeadlineMs((Timer)this.time.timer(100L)));
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.get(0).size());
        Assertions.assertEquals((Object)acknowledgementsTp0.getAcknowledgementsTypeMap(), (Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgementsTypeMap());
        Assertions.assertEquals((Object)Errors.NOT_LEADER_OR_FOLLOWER.exception(), (Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
        this.completedAcknowledgements.clear();
        Assertions.assertEquals((int)2, (int)this.shareConsumeRequestManager.sendAcknowledgements());
        this.client.prepareResponseFrom((AbstractResponse)this.fullAcknowledgeResponse(this.tip1, Errors.NONE), nodeId1);
        this.networkClientDelegate.poll(this.time.timer(0L));
        this.client.prepareResponseFrom((AbstractResponse)this.emptyAcknowledgeResponse(), nodeId0);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertEquals((int)1, (int)this.completedAcknowledgements.get(0).size());
        Assertions.assertEquals((Object)acknowledgementsTp1, (Object)this.completedAcknowledgements.get(0).get(this.tip1));
        Assertions.assertNull((Object)((Object)this.completedAcknowledgements.get(0).get(this.tip1).getAcknowledgeException()));
    }

    @Test
    void testWhenLeadershipChangedAfterDisconnected() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        this.subscriptions.subscribeToShareGroup(Collections.singleton("test"));
        HashSet<TopicPartition> partitions = new HashSet<TopicPartition>();
        partitions.add(this.tp0);
        partitions.add(this.tp1);
        this.subscriptions.assignFromSubscribed(partitions);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWithIds(2, Map.of("test", 2), tp -> 0, this.topicIds, false));
        Node nodeId0 = this.metadata.fetch().nodeById(0);
        Node nodeId1 = this.metadata.fetch().nodeById(1);
        Cluster startingClusterMetadata = this.metadata.fetch();
        Assertions.assertFalse((boolean)this.metadata.updateRequested());
        Assertions.assertEquals((int)2, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData> partitionData = new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>();
        partitionData.put(this.tip0, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip0.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId0);
        partitionData.clear();
        partitionData.put(this.tip1, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip1.topicPartition().partition()).setErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId1);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Map partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertFalse((boolean)partitionRecords.containsKey(this.tp1));
        List fetchedRecords = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
        Acknowledgements acknowledgements = Acknowledgements.empty();
        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
        Assertions.assertEquals((Object)startingClusterMetadata, (Object)this.metadata.fetch());
        acknowledgements = Acknowledgements.empty();
        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
        this.shareConsumeRequestManager.fetch(Map.of(this.tip0, new NodeAcknowledgements(0, acknowledgements)), Collections.emptyMap());
        Assertions.assertEquals((int)2, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        partitionData.clear();
        partitionData.put(this.tip0, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip0.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId0, true);
        partitionData.clear();
        partitionData.put(this.tip1, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip1.topicPartition().partition()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId1);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertInstanceOf(DisconnectException.class, (Object)((Object)this.completedAcknowledgements.get(0).get(this.tip0).getAcknowledgeException()));
        this.completedAcknowledgements.clear();
        partitionRecords = this.fetchRecords();
        Assertions.assertFalse((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp1));
        fetchedRecords = partitionRecords.get(this.tp1);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
        HashMap<TopicPartition, Metadata.LeaderIdAndEpoch> partitionLeaders = new HashMap<TopicPartition, Metadata.LeaderIdAndEpoch>();
        partitionLeaders.put(this.tp0, new Metadata.LeaderIdAndEpoch(Optional.of(nodeId1.id()), Optional.of(1)));
        this.metadata.updatePartitionLeadership(partitionLeaders, List.of());
        Assertions.assertNotEquals((Object)startingClusterMetadata, (Object)this.metadata.fetch());
        this.shareConsumeRequestManager.fetch(Map.of(this.tip1, new NodeAcknowledgements(1, acknowledgements)), Collections.emptyMap());
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        partitionData.clear();
        partitionData.put(this.tip0, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip0.topicPartition().partition()).setErrorCode(Errors.NONE.code()).setRecords((BaseRecords)this.records).setAcquiredRecords(ShareCompletedFetchTest.acquiredRecords(1L, 1)).setAcknowledgeErrorCode(Errors.NONE.code()));
        partitionData.put(this.tip1, new ShareFetchResponseData.PartitionData().setPartitionIndex(this.tip1.topicPartition().partition()));
        this.client.prepareResponseFrom((AbstractResponse)ShareFetchResponse.of((Errors)Errors.NONE, (int)0, partitionData, Collections.emptyList(), (int)0), nodeId1);
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        Assertions.assertNull((Object)((Object)this.completedAcknowledgements.get(0).get(this.tip1).getAcknowledgeException()));
        partitionRecords = this.fetchRecords();
        Assertions.assertTrue((boolean)partitionRecords.containsKey(this.tp0));
        Assertions.assertFalse((boolean)partitionRecords.containsKey(this.tp1));
        fetchedRecords = partitionRecords.get(this.tp0);
        Assertions.assertEquals((int)1, (int)fetchedRecords.size());
    }

    private ShareFetchResponse fetchResponseWithTopLevelError(TopicIdPartition tp, Errors error) {
        Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitions = Map.of(tp, new ShareFetchResponseData.PartitionData().setPartitionIndex(tp.topicPartition().partition()).setErrorCode(error.code()));
        return ShareFetchResponse.of((Errors)error, (int)0, new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>(partitions), Collections.emptyList(), (int)0);
    }

    private ShareFetchResponse fullFetchResponse(TopicIdPartition tp, MemoryRecords records, List<ShareFetchResponseData.AcquiredRecords> acquiredRecords, Errors error) {
        return this.fullFetchResponse(tp, records, acquiredRecords, error, Errors.NONE);
    }

    private ShareFetchResponse fullFetchResponse(TopicIdPartition tp, MemoryRecords records, List<ShareFetchResponseData.AcquiredRecords> acquiredRecords, Errors error, Errors acknowledgeError) {
        Map<TopicIdPartition, ShareFetchResponseData.PartitionData> partitions = Map.of(tp, this.partitionDataForFetch(tp, records, acquiredRecords, error, acknowledgeError));
        return ShareFetchResponse.of((Errors)Errors.NONE, (int)0, new LinkedHashMap<TopicIdPartition, ShareFetchResponseData.PartitionData>(partitions), Collections.emptyList(), (int)0);
    }

    private ShareAcknowledgeResponse emptyAcknowledgeResponse() {
        Map partitions = Collections.emptyMap();
        return ShareAcknowledgeResponse.of((Errors)Errors.NONE, (int)0, new LinkedHashMap(partitions), Collections.emptyList());
    }

    private ShareAcknowledgeResponse acknowledgeResponseWithTopLevelError(TopicIdPartition tp, Errors error) {
        Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> partitions = Map.of(tp, this.partitionDataForAcknowledge(tp, Errors.NONE));
        return ShareAcknowledgeResponse.of((Errors)error, (int)0, new LinkedHashMap<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>(partitions), Collections.emptyList());
    }

    private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition tp, Errors error) {
        Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> partitions = Map.of(tp, this.partitionDataForAcknowledge(tp, error));
        return ShareAcknowledgeResponse.of((Errors)Errors.NONE, (int)0, new LinkedHashMap<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>(partitions), Collections.emptyList());
    }

    private ShareAcknowledgeResponse fullAcknowledgeResponse(Map<TopicIdPartition, Errors> partitionErrorsMap) {
        HashMap partitions = new HashMap();
        partitionErrorsMap.forEach((tip, error) -> partitions.put(tip, this.partitionDataForAcknowledge((TopicIdPartition)tip, (Errors)error)));
        return ShareAcknowledgeResponse.of((Errors)Errors.NONE, (int)0, new LinkedHashMap(partitions), Collections.emptyList());
    }

    private ShareAcknowledgeResponse fullAcknowledgeResponse(TopicIdPartition tp, Errors error, ShareAcknowledgeResponseData.LeaderIdAndEpoch currentLeader, List<Node> nodeEndpoints) {
        Map<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData> partitions = Map.of(tp, this.partitionDataForAcknowledge(tp, error, currentLeader));
        return ShareAcknowledgeResponse.of((Errors)Errors.NONE, (int)0, new LinkedHashMap<TopicIdPartition, ShareAcknowledgeResponseData.PartitionData>(partitions), nodeEndpoints);
    }

    private ShareFetchResponseData.PartitionData partitionDataForFetch(TopicIdPartition tp, MemoryRecords records, List<ShareFetchResponseData.AcquiredRecords> acquiredRecords, Errors error, Errors acknowledgeError) {
        return new ShareFetchResponseData.PartitionData().setPartitionIndex(tp.topicPartition().partition()).setErrorCode(error.code()).setAcknowledgeErrorCode(acknowledgeError.code()).setRecords((BaseRecords)records).setAcquiredRecords(acquiredRecords);
    }

    private ShareAcknowledgeResponseData.PartitionData partitionDataForAcknowledge(TopicIdPartition tp, Errors error) {
        return new ShareAcknowledgeResponseData.PartitionData().setPartitionIndex(tp.topicPartition().partition()).setErrorCode(error.code());
    }

    private ShareAcknowledgeResponseData.PartitionData partitionDataForAcknowledge(TopicIdPartition tp, Errors error, ShareAcknowledgeResponseData.LeaderIdAndEpoch currentLeader) {
        return new ShareAcknowledgeResponseData.PartitionData().setPartitionIndex(tp.topicPartition().partition()).setErrorCode(error.code()).setCurrentLeader(currentLeader);
    }

    private void assertEmptyFetch(String reason) {
        ShareFetch fetch = this.collectFetch();
        Assertions.assertEquals(Collections.emptyMap(), (Object)fetch.records(), (String)reason);
        Assertions.assertTrue((boolean)fetch.isEmpty(), (String)reason);
    }

    private Acknowledgements getAcknowledgements(int startIndex, AcknowledgeType ... acknowledgeTypes) {
        Acknowledgements acknowledgements = Acknowledgements.empty();
        int index = startIndex;
        for (AcknowledgeType type : acknowledgeTypes) {
            acknowledgements.add((long)index++, type);
        }
        return acknowledgements;
    }

    private <K, V> Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchRecords() {
        ShareFetch<K, V> fetch = this.collectFetch();
        if (fetch.isEmpty()) {
            return Collections.emptyMap();
        }
        return fetch.records();
    }

    private <K, V> ShareFetch<K, V> collectFetch() {
        return this.shareConsumeRequestManager.collectFetch();
    }

    private void buildRequestManager() {
        this.buildRequestManager((Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
    }

    private <K, V> void buildRequestManager(Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        this.buildRequestManager(new MetricConfig(), keyDeserializer, valueDeserializer);
    }

    private <K, V> void buildRequestManager(MetricConfig metricConfig, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        LogContext logContext = new LogContext();
        SubscriptionState subscriptionState = new SubscriptionState(logContext, AutoOffsetResetStrategy.EARLIEST);
        this.buildRequestManager(metricConfig, keyDeserializer, valueDeserializer, subscriptionState, logContext);
    }

    private <K, V> void buildRequestManager(MetricConfig metricConfig, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, SubscriptionState subscriptionState, LogContext logContext) {
        this.buildDependencies(metricConfig, subscriptionState, logContext);
        Deserializers deserializers = new Deserializers(keyDeserializer, valueDeserializer, this.metrics);
        int maxWaitMs = 0;
        int maxBytes = Integer.MAX_VALUE;
        int fetchSize = 1000;
        int minBytes = 1;
        FetchConfig fetchConfig = new FetchConfig(minBytes, maxBytes, maxWaitMs, fetchSize, Integer.MAX_VALUE, true, "", IsolationLevel.READ_UNCOMMITTED);
        ShareFetchCollector shareFetchCollector = new ShareFetchCollector(logContext, this.metadata, this.subscriptions, fetchConfig, deserializers);
        TestableBackgroundEventHandler backgroundEventHandler = new TestableBackgroundEventHandler(this.time, this.completedAcknowledgements);
        this.shareConsumeRequestManager = (TestableShareConsumeRequestManager)((Object)Mockito.spy(new TestableShareConsumeRequestManager(logContext, "test-group", this.metadata, subscriptionState, fetchConfig, new ShareFetchBuffer(logContext), backgroundEventHandler, this.metricsManager, shareFetchCollector)));
    }

    private void buildDependencies(MetricConfig metricConfig, SubscriptionState subscriptionState, LogContext logContext) {
        this.time = new MockTime(1L, 0L, 0L);
        this.subscriptions = subscriptionState;
        this.metadata = new ConsumerMetadata(0L, 0L, Long.MAX_VALUE, false, false, this.subscriptions, logContext, new ClusterResourceListeners());
        this.client = new MockClient((Time)this.time, (Metadata)this.metadata);
        this.metrics = new Metrics(metricConfig, (Time)this.time);
        this.shareFetchMetricsRegistry = new ShareFetchMetricsRegistry(metricConfig.tags().keySet(), "consumer-sharetest-group");
        this.metricsManager = new ShareFetchMetricsManager(this.metrics, this.shareFetchMetricsRegistry);
        Properties properties = new Properties();
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.setProperty("request.timeout.ms", String.valueOf(30000L));
        properties.setProperty("retry.backoff.ms", String.valueOf(100L));
        ConsumerConfig config = new ConsumerConfig(properties);
        this.networkClientDelegate = (TestableNetworkClientDelegate)((Object)Mockito.spy((Object)((Object)new TestableNetworkClientDelegate(this.time, config, logContext, this.client, (Metadata)this.metadata, new BackgroundEventHandler(new LinkedBlockingQueue(), (Time)this.time, (AsyncConsumerMetrics)Mockito.mock(AsyncConsumerMetrics.class)), false))));
    }

    @Test
    void testFetchWithControlRecords() {
        this.buildRequestManager();
        this.shareConsumeRequestManager.setAcknowledgementCommitCallbackRegistered(true);
        HashMap<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsMap = new HashMap<TopicIdPartition, NodeAcknowledgements>();
        Acknowledgements acknowledgements = Acknowledgements.empty();
        acknowledgements.add(1L, AcknowledgeType.ACCEPT);
        nodeAcknowledgementsMap.put(this.tip0, new NodeAcknowledgements(0, acknowledgements));
        HashMap<TopicIdPartition, NodeAcknowledgements> nodeAcknowledgementsControlRecordMap = new HashMap<TopicIdPartition, NodeAcknowledgements>();
        Acknowledgements controlAcknowledgements = Acknowledgements.empty();
        controlAcknowledgements.addGap(2L);
        nodeAcknowledgementsControlRecordMap.put(this.tip0, new NodeAcknowledgements(0, controlAcknowledgements));
        this.shareConsumeRequestManager.fetch(nodeAcknowledgementsMap, nodeAcknowledgementsControlRecordMap);
        Map fetchAcksToSend = this.shareConsumeRequestManager.getFetchAcknowledgementsToSend(0);
        Assertions.assertEquals((int)1, (int)fetchAcksToSend.size());
        Assertions.assertEquals((Object)AcknowledgeType.ACCEPT, (Object)((Acknowledgements)fetchAcksToSend.get(this.tip0)).get(1L));
        Assertions.assertEquals((int)2, (int)((Acknowledgements)fetchAcksToSend.get(this.tip0)).size());
        Assertions.assertNull((Object)((Acknowledgements)fetchAcksToSend.get(this.tip0)).get(3L));
    }

    private void sendFetchAndVerifyResponse(MemoryRecords records, List<ShareFetchResponseData.AcquiredRecords> acquiredRecords, Errors ... error) {
        Assertions.assertEquals((int)1, (int)this.sendFetches());
        Assertions.assertFalse((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
        if (error.length > 1) {
            this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tip0, records, acquiredRecords, error[0], error[1]));
        } else {
            this.client.prepareResponse((AbstractResponse)this.fullFetchResponse(this.tip0, records, acquiredRecords, error[0]));
        }
        this.networkClientDelegate.poll(this.time.timer(0L));
        Assertions.assertTrue((boolean)this.shareConsumeRequestManager.hasCompletedFetches());
    }

    private class TestableShareConsumeRequestManager<K, V>
    extends ShareConsumeRequestManager {
        private final ShareFetchCollector<K, V> shareFetchCollector;

        public TestableShareConsumeRequestManager(LogContext logContext, String groupId, ConsumerMetadata metadata, SubscriptionState subscriptions, FetchConfig fetchConfig, ShareFetchBuffer shareFetchBuffer, BackgroundEventHandler backgroundEventHandler, ShareFetchMetricsManager metricsManager, ShareFetchCollector<K, V> fetchCollector) {
            super((Time)ShareConsumeRequestManagerTest.this.time, logContext, groupId, metadata, subscriptions, fetchConfig, shareFetchBuffer, backgroundEventHandler, metricsManager, 100L, 1000L);
            this.shareFetchCollector = fetchCollector;
            this.onMemberEpochUpdated(Optional.empty(), Uuid.randomUuid().toString());
        }

        private ShareFetch<K, V> collectFetch() {
            return this.shareFetchCollector.collect(this.shareFetchBuffer);
        }

        private int sendFetches() {
            this.fetch(new HashMap(), new HashMap());
            NetworkClientDelegate.PollResult pollResult = this.poll(ShareConsumeRequestManagerTest.this.time.milliseconds());
            ShareConsumeRequestManagerTest.this.networkClientDelegate.addAll(pollResult.unsentRequests);
            return pollResult.unsentRequests.size();
        }

        private NetworkClientDelegate.PollResult sendFetchesReturnPollResult() {
            this.fetch(new HashMap(), new HashMap());
            NetworkClientDelegate.PollResult pollResult = this.poll(ShareConsumeRequestManagerTest.this.time.milliseconds());
            ShareConsumeRequestManagerTest.this.networkClientDelegate.addAll(pollResult.unsentRequests);
            return pollResult;
        }

        private int sendAcknowledgements() {
            NetworkClientDelegate.PollResult pollResult = this.poll(ShareConsumeRequestManagerTest.this.time.milliseconds());
            ShareConsumeRequestManagerTest.this.networkClientDelegate.addAll(pollResult.unsentRequests);
            return pollResult.unsentRequests.size();
        }

        public ShareConsumeRequestManager.ResultHandler buildResultHandler(AtomicInteger remainingResults, Optional<CompletableFuture<Map<TopicIdPartition, Acknowledgements>>> future) {
            return new ShareConsumeRequestManager.ResultHandler((ShareConsumeRequestManager)this, remainingResults, future);
        }

        public ShareConsumeRequestManager.Tuple<ShareConsumeRequestManager.AcknowledgeRequestState> requestStates(int nodeId) {
            return super.requestStates(nodeId);
        }
    }

    private class TestableNetworkClientDelegate
    extends NetworkClientDelegate {
        private final ConcurrentLinkedQueue<Node> pendingDisconnects;

        public TestableNetworkClientDelegate(Time time, ConsumerConfig config, LogContext logContext, KafkaClient client, Metadata metadata, BackgroundEventHandler backgroundEventHandler, boolean notifyMetadataErrorsViaErrorQueue) {
            super(time, config, logContext, client, metadata, backgroundEventHandler, notifyMetadataErrorsViaErrorQueue, (AsyncConsumerMetrics)Mockito.mock(AsyncConsumerMetrics.class));
            this.pendingDisconnects = new ConcurrentLinkedQueue();
        }

        public void poll(long timeoutMs, long currentTimeMs) {
            this.handlePendingDisconnects();
            super.poll(timeoutMs, currentTimeMs);
        }

        public void poll(Timer timer) {
            long pollTimeout = Math.min(timer.remainingMs(), 30000L);
            if (ShareConsumeRequestManagerTest.this.client.inFlightRequestCount() == 0) {
                pollTimeout = Math.min(pollTimeout, 100L);
            }
            this.poll(pollTimeout, timer.currentTimeMs());
        }

        private Set<Node> unsentRequestNodes() {
            HashSet<Node> set = new HashSet<Node>();
            for (NetworkClientDelegate.UnsentRequest u : this.unsentRequests()) {
                u.node().ifPresent(set::add);
            }
            return set;
        }

        private List<NetworkClientDelegate.UnsentRequest> removeUnsentRequestByNode(Node node) {
            ArrayList<NetworkClientDelegate.UnsentRequest> list = new ArrayList<NetworkClientDelegate.UnsentRequest>();
            Iterator it = this.unsentRequests().iterator();
            while (it.hasNext()) {
                NetworkClientDelegate.UnsentRequest u = (NetworkClientDelegate.UnsentRequest)it.next();
                if (!node.equals(u.node().orElse(null))) continue;
                it.remove();
                list.add(u);
            }
            return list;
        }

        protected void checkDisconnects(long currentTimeMs) {
            for (Node node : this.unsentRequestNodes()) {
                if (!ShareConsumeRequestManagerTest.this.client.connectionFailed(node)) continue;
                for (NetworkClientDelegate.UnsentRequest unsentRequest : this.removeUnsentRequestByNode(node)) {
                    NetworkClientDelegate.FutureCompletionHandler handler = unsentRequest.handler();
                    AuthenticationException authenticationException = ShareConsumeRequestManagerTest.this.client.authenticationException(node);
                    long startMs = unsentRequest.timer().currentTimeMs() - unsentRequest.timer().elapsedMs();
                    handler.onComplete(new ClientResponse(this.makeHeader(unsentRequest.requestBuilder().latestAllowedVersion()), (RequestCompletionHandler)unsentRequest.handler(), unsentRequest.node().toString(), startMs, currentTimeMs, true, null, authenticationException, null));
                }
            }
        }

        private RequestHeader makeHeader(short version) {
            return new RequestHeader(new RequestHeaderData().setRequestApiKey(ApiKeys.SHARE_FETCH.id).setRequestApiVersion(version), ApiKeys.SHARE_FETCH.requestHeaderVersion(version));
        }

        private void handlePendingDisconnects() {
            Node node;
            while ((node = this.pendingDisconnects.poll()) != null) {
                this.failUnsentRequests(node);
                ShareConsumeRequestManagerTest.this.client.disconnect(node.idString());
            }
        }

        private void failUnsentRequests(Node node) {
            for (NetworkClientDelegate.UnsentRequest unsentRequest : this.removeUnsentRequestByNode(node)) {
                NetworkClientDelegate.FutureCompletionHandler handler = unsentRequest.handler();
                handler.onFailure(ShareConsumeRequestManagerTest.this.time.milliseconds(), (RuntimeException)DisconnectException.INSTANCE);
            }
        }
    }

    private static class TestableBackgroundEventHandler
    extends BackgroundEventHandler {
        List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements;

        public TestableBackgroundEventHandler(Time time, List<Map<TopicIdPartition, Acknowledgements>> completedAcknowledgements) {
            super(new LinkedBlockingQueue(), time, (AsyncConsumerMetrics)Mockito.mock(AsyncConsumerMetrics.class));
            this.completedAcknowledgements = completedAcknowledgements;
        }

        public void add(BackgroundEvent event) {
            if (event.type() == BackgroundEvent.Type.SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK) {
                ShareAcknowledgementCommitCallbackEvent shareAcknowledgementCommitCallbackEvent = (ShareAcknowledgementCommitCallbackEvent)event;
                this.completedAcknowledgements.add(shareAcknowledgementCommitCallbackEvent.acknowledgementsMap());
            }
        }
    }
}

