/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.internals.MemberState;
import org.apache.kafka.clients.consumer.internals.MemberStateListener;
import org.apache.kafka.clients.consumer.internals.StreamsMembershipManager;
import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnAllTasksLostCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksAssignedCallbackNeededEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackCompletedEvent;
import org.apache.kafka.clients.consumer.internals.events.StreamsOnTasksRevokedCallbackNeededEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.message.StreamsGroupHeartbeatResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.verification.VerificationMode;

@ExtendWith(value={MockitoExtension.class})
public class StreamsMembershipManagerTest {
    private static final String GROUP_ID = "test-group";
    private static final int MEMBER_EPOCH = 1;
    private static final String SUBTOPOLOGY_ID_0 = "subtopology-0";
    private static final String SUBTOPOLOGY_ID_1 = "subtopology-1";
    private static final String TOPIC_0 = "topic-0";
    private static final String TOPIC_1 = "topic-1";
    private static final int PARTITION_0 = 0;
    private static final int PARTITION_1 = 1;
    private final Time time = new MockTime(0L);
    private final Metrics metrics = new Metrics(this.time);
    private StreamsMembershipManager membershipManager;
    @Mock
    private SubscriptionState subscriptionState;
    @Mock
    private BackgroundEventHandler backgroundEventHandler;
    @Mock
    private StreamsRebalanceData streamsRebalanceData;
    @Mock
    private MemberStateListener memberStateListener;
    @Captor
    private ArgumentCaptor<StreamsOnTasksAssignedCallbackNeededEvent> onTasksAssignedCallbackNeededEventCaptor;
    private int onTasksAssignedCallbackNeededAddCount = 0;
    @Captor
    private ArgumentCaptor<StreamsOnTasksRevokedCallbackNeededEvent> onTasksRevokedCallbackNeededEventCaptor;
    @Captor
    private ArgumentCaptor<StreamsOnAllTasksLostCallbackNeededEvent> onAllTasksLostCallbackNeededEventCaptor;

    @BeforeEach
    public void setup() {
        this.membershipManager = new StreamsMembershipManager(GROUP_ID, this.streamsRebalanceData, this.subscriptionState, this.backgroundEventHandler, new LogContext("test"), this.time, this.metrics);
        this.membershipManager.registerStateListener(this.memberStateListener);
        StreamsMembershipManagerTest.verifyInStateUnsubscribed(this.membershipManager);
    }

    @Test
    public void testUnexpectedErrorInHeartbeatResponse() {
        String errorMessage = "Nobody expects the Spanish Inquisition!";
        StreamsGroupHeartbeatResponseData responseData = new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code()).setErrorMessage("Nobody expects the Spanish Inquisition!");
        StreamsGroupHeartbeatResponse response = new StreamsGroupHeartbeatResponse(responseData);
        IllegalArgumentException exception = (IllegalArgumentException)Assertions.assertThrows(IllegalArgumentException.class, () -> this.membershipManager.onHeartbeatSuccess(response));
        Assertions.assertEquals((Object)("Unexpected error in Heartbeat response. Expected no error, but received: " + Errors.GROUP_AUTHORIZATION_FAILED.name() + " with message: 'Nobody expects the Spanish Inquisition!'"), (Object)exception.getMessage());
    }

    @Test
    public void testActiveTasksAreNullInHeartbeatResponse() {
        this.testTasksAreNullInHeartbeatResponse(null, Collections.emptyList(), Collections.emptyList());
    }

    @Test
    public void testStandbyTasksAreNullInHeartbeatResponse() {
        this.testTasksAreNullInHeartbeatResponse(Collections.emptyList(), null, Collections.emptyList());
    }

    @Test
    public void testWarmupTasksAreNullInHeartbeatResponse() {
        this.testTasksAreNullInHeartbeatResponse(Collections.emptyList(), Collections.emptyList(), null);
    }

    private void testTasksAreNullInHeartbeatResponse(List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks, List<StreamsGroupHeartbeatResponseData.TaskIds> standbyTasks, List<StreamsGroupHeartbeatResponseData.TaskIds> warmupTasks) {
        this.joining();
        StreamsGroupHeartbeatResponse response = this.makeHeartbeatResponse(activeTasks, standbyTasks, warmupTasks);
        IllegalStateException exception = (IllegalStateException)Assertions.assertThrows(IllegalStateException.class, () -> this.membershipManager.onHeartbeatSuccess(response));
        Assertions.assertEquals((Object)("Invalid response data, task collections must be all null or all non-null: " + String.valueOf(response.data())), (Object)exception.getMessage());
    }

    @Test
    public void testJoining() {
        this.joining();
        StreamsMembershipManagerTest.verifyInStateJoining(this.membershipManager);
        Assertions.assertEquals((int)0, (int)this.membershipManager.memberEpoch());
    }

    @Test
    public void testReconcilingEmptyToSingleActiveTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 0));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingActiveTaskToDifferentActiveTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Mockito.when((Object)this.subscriptionState.assignedPartitions()).thenReturn(Set.of()).thenReturn(Set.of(new TopicPartition(TOPIC_0, 0)));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksRevokedCallbackExecuted = this.verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup);
        Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new TopicPartition(TOPIC_0, 0));
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 1));
        this.verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(expectedPartitionsToRevoke, expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksRevokedCallbackExecuted.complete(null);
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
    }

    @Test
    public void testReconcilingSingleActiveTaskToAdditionalActiveTask() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Mockito.when((Object)this.subscriptionState.assignedPartitions()).thenReturn(Set.of()).thenReturn(Set.of(new TopicPartition(TOPIC_0, 0)));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0), Integer.valueOf(1))));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 0), new TopicPartition(TOPIC_0, 1));
        Set<TopicPartition> expectedNewPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 1));
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingMultipleActiveTaskToSingleActiveTask() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Set<StreamsRebalanceData.TaskId> activeTasksToRevoke = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Mockito.when((Object)this.subscriptionState.assignedPartitions()).thenReturn(Set.of()).thenReturn(Set.of(new TopicPartition(TOPIC_0, 0), new TopicPartition(TOPIC_0, 1)));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0), Integer.valueOf(1))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksRevokedCallbackExecuted = this.verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksToRevoke);
        Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new TopicPartition(TOPIC_0, 0));
        Set<TopicPartition> expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 1));
        Set<TopicPartition> expectedNewPartitionsToAssign = Set.of();
        this.verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(expectedPartitionsToRevoke, expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksRevokedCallbackExecuted.complete(null);
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
    }

    @Test
    public void testReconcilingEmptyToMultipleActiveTaskOfDifferentSubtopologies() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        this.setupStreamsReabalanceDataWithTwoSubtopologies(SUBTOPOLOGY_ID_0, TOPIC_0, SUBTOPOLOGY_ID_1, TOPIC_1);
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_1, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0)), SUBTOPOLOGY_ID_1, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 0), new TopicPartition(TOPIC_1, 0));
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingActiveTaskToStandbyTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> standbyTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Mockito.when((Object)this.subscriptionState.assignedPartitions()).thenReturn(Set.of()).thenReturn(Set.of(new TopicPartition(TOPIC_0, 0))).thenReturn(Set.of());
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.reconcile(this.makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksRevokedCallbackExecuted = this.verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup);
        Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new TopicPartition(TOPIC_0, 0));
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of();
        this.verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(expectedPartitionsToRevoke, expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksRevokedCallbackExecuted.complete(null);
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), standbyTasks, Set.of());
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
    }

    @Test
    public void testReconcilingActiveTaskToWarmupTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> warmupTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Mockito.when((Object)this.subscriptionState.assignedPartitions()).thenReturn(Set.of()).thenReturn(Set.of(new TopicPartition(TOPIC_0, 0))).thenReturn(Set.of());
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.reconcile(this.makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksRevokedCallbackExecuted = this.verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup);
        Set<TopicPartition> expectedPartitionsToRevoke = Set.of(new TopicPartition(TOPIC_0, 0));
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of();
        this.verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(expectedPartitionsToRevoke, expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksRevokedCallbackExecuted.complete(null);
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), Set.of(), warmupTasks);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
    }

    @Test
    public void testReconcilingEmptyToSingleStandbyTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<StreamsRebalanceData.TaskId> standbyTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), standbyTasks, Set.of());
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of();
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingStandbyTaskToDifferentStandbyTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<StreamsRebalanceData.TaskId> standbyTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> standbyTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), standbyTasksSetup, Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        Mockito.reset((Object[])new SubscriptionState[]{this.subscriptionState});
        Mockito.reset((Object[])new MemberStateListener[]{this.memberStateListener});
        this.reconcile(this.makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), standbyTasks, Set.of());
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of();
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingSingleStandbyTaskToAdditionalStandbyTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<StreamsRebalanceData.TaskId> standbyTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> standbyTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), standbyTasksSetup, Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        Mockito.reset((Object[])new SubscriptionState[]{this.subscriptionState});
        Mockito.reset((Object[])new MemberStateListener[]{this.memberStateListener});
        this.reconcile(this.makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0), Integer.valueOf(1))));
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of();
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), standbyTasks, Set.of());
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingMultipleStandbyTaskToSingleStandbyTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<StreamsRebalanceData.TaskId> standbyTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Set<StreamsRebalanceData.TaskId> standbyTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0), Integer.valueOf(1))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), standbyTasksSetup, Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        Mockito.reset((Object[])new SubscriptionState[]{this.subscriptionState});
        Mockito.reset((Object[])new MemberStateListener[]{this.memberStateListener});
        this.reconcile(this.makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of();
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), standbyTasks, Set.of());
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingStandbyTaskToActiveTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> standbyTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Mockito.when((Object)this.subscriptionState.assignedPartitions()).thenReturn(Set.of()).thenReturn(Set.of()).thenReturn(Set.of(new TopicPartition(TOPIC_0, 0)));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), standbyTasksSetup, Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 1));
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingStandbyTaskToWarmupTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<StreamsRebalanceData.TaskId> standbyTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> warmupTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), standbyTasksSetup, Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        Mockito.reset((Object[])new SubscriptionState[]{this.subscriptionState});
        Mockito.reset((Object[])new MemberStateListener[]{this.memberStateListener});
        this.reconcile(this.makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), Set.of(), warmupTasks);
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of();
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingEmptyToSingleWarmupTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<StreamsRebalanceData.TaskId> warmupTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), Set.of(), warmupTasks);
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of();
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingWarmupTaskToDifferentWarmupTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<StreamsRebalanceData.TaskId> warmupTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> warmupTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), Set.of(), warmupTasksSetup);
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        Mockito.reset((Object[])new SubscriptionState[]{this.subscriptionState});
        Mockito.reset((Object[])new MemberStateListener[]{this.memberStateListener});
        this.reconcile(this.makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), Set.of(), warmupTasks);
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of();
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingSingleWarmupTaskToAdditionalWarmupTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<StreamsRebalanceData.TaskId> warmupTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> warmupTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), Set.of(), warmupTasksSetup);
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        Mockito.reset((Object[])new SubscriptionState[]{this.subscriptionState});
        Mockito.reset((Object[])new MemberStateListener[]{this.memberStateListener});
        this.reconcile(this.makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0), Integer.valueOf(1))));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), Set.of(), warmupTasks);
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of();
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingMultipleWarmupTaskToSingleWarmupTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<StreamsRebalanceData.TaskId> warmupTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0), new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Set<StreamsRebalanceData.TaskId> warmupTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0), Integer.valueOf(1))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), Set.of(), warmupTasksSetup);
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        Mockito.reset((Object[])new SubscriptionState[]{this.subscriptionState});
        Mockito.reset((Object[])new MemberStateListener[]{this.memberStateListener});
        this.reconcile(this.makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), Set.of(), warmupTasks);
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of();
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingWarmupTaskToActiveTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> warmupTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Mockito.when((Object)this.subscriptionState.assignedPartitions()).thenReturn(Set.of()).thenReturn(Set.of()).thenReturn(Set.of(new TopicPartition(TOPIC_0, 1)));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), Set.of(), warmupTasksSetup);
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 1));
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingWarmupTaskToStandbyTask() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<StreamsRebalanceData.TaskId> warmupTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> standbyTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithWarmupTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), Set.of(), warmupTasksSetup);
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        Mockito.reset((Object[])new SubscriptionState[]{this.subscriptionState});
        Mockito.reset((Object[])new MemberStateListener[]{this.memberStateListener});
        this.reconcile(this.makeHeartbeatResponseWithStandbyTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), standbyTasks, Set.of());
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of();
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.complete(null);
        this.verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign);
        this.verifyThatNoTasksHaveBeenRevoked();
    }

    @Test
    public void testReconcilingAndAssignmentCallbackFails() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 0));
        this.verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksAssignedCallbackExecuted.completeExceptionally(new RuntimeException("KABOOM!"));
        StreamsMembershipManagerTest.verifyInStateReconciling(this.membershipManager);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).enablePartitionsAwaitingCallback((Collection)ArgumentMatchers.any());
    }

    @Test
    public void testReconcilingAndRevocationCallbackFails() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<TopicPartition> partitionsToAssignAtSetup;
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Mockito.when((Object)this.subscriptionState.assignedPartitions()).thenReturn(Set.of()).thenReturn(Set.of(new TopicPartition(TOPIC_0, 0)));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksRevokedCallbackExecuted = this.verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup);
        Set<TopicPartition> expectedPartitionsToRevoke = partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, 0));
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 1));
        this.verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(expectedPartitionsToRevoke, expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksRevokedCallbackExecuted.completeExceptionally(new RuntimeException("KABOOM!"));
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).markPendingRevocation(expectedPartitionsToRevoke);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).assignFromSubscribedAwaitingCallback(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener, (VerificationMode)Mockito.never())).onGroupAssignmentUpdated(expectedFullPartitionsToAssign);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).enablePartitionsAwaitingCallback((Collection)ArgumentMatchers.argThat(a -> !a.equals(partitionsToAssignAtSetup)));
        StreamsMembershipManagerTest.verifyInStateReconciling(this.membershipManager);
        this.verifyTasksNotAssigned(activeTasks, Set.of(), Set.of());
        StreamsMembershipManagerTest.verifyInStateReconciling(this.membershipManager);
    }

    @Test
    public void testReconcilingWhenReconciliationAbortedBeforeAssignmentDueToRejoin() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<TopicPartition> partitionsToAssignAtSetup;
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Mockito.when((Object)this.subscriptionState.assignedPartitions()).thenReturn(Set.of()).thenReturn(Set.of(new TopicPartition(TOPIC_0, 0)));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksRevokedCallbackExecuted = this.verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup);
        Set<TopicPartition> expectedPartitionsToRevoke = partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, 0));
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 1));
        this.verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(expectedPartitionsToRevoke, expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        this.membershipManager.onPollTimerExpired();
        this.membershipManager.onHeartbeatRequestGenerated();
        CompletableFuture<Void> onAllTasksLostCallbackExecuted = this.verifyOnAllTasksLostCallbackNeededEventAddedToBackgroundEventHandler();
        onAllTasksLostCallbackExecuted.complete(null);
        this.membershipManager.maybeRejoinStaleMember();
        onTasksRevokedCallbackExecuted.complete(null);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).assignFromSubscribedAwaitingCallback(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener, (VerificationMode)Mockito.never())).onGroupAssignmentUpdated(expectedFullPartitionsToAssign);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).enablePartitionsAwaitingCallback((Collection)ArgumentMatchers.argThat(a -> !a.equals(partitionsToAssignAtSetup)));
        this.verifyTasksNotAssigned(activeTasks, Set.of(), Set.of());
        StreamsMembershipManagerTest.verifyInStateJoining(this.membershipManager);
    }

    @Test
    public void testReconcilingWhenReconciliationAbortedBeforeAssignmentDueToNotInReconciling() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<TopicPartition> partitionsToAssignAtSetup;
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Mockito.when((Object)this.subscriptionState.assignedPartitions()).thenReturn(Set.of()).thenReturn(Set.of(new TopicPartition(TOPIC_0, 0)));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksRevokedCallbackExecuted = this.verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup);
        Set<TopicPartition> expectedPartitionsToRevoke = partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, 0));
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 1));
        this.verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(expectedPartitionsToRevoke, expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        this.membershipManager.transitionToFatal();
        CompletableFuture<Void> onAllTasksLostCallbackExecuted = this.verifyOnAllTasksLostCallbackNeededEventAddedToBackgroundEventHandler();
        onAllTasksLostCallbackExecuted.complete(null);
        onTasksRevokedCallbackExecuted.complete(null);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).assignFromSubscribedAwaitingCallback(expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener, (VerificationMode)Mockito.never())).onGroupAssignmentUpdated(expectedFullPartitionsToAssign);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).enablePartitionsAwaitingCallback((Collection)ArgumentMatchers.argThat(a -> !a.equals(partitionsToAssignAtSetup)));
        this.verifyTasksNotAssigned(activeTasks, Set.of(), Set.of());
        StreamsMembershipManagerTest.verifyInStateFatal(this.membershipManager);
    }

    @Test
    public void testReconcilingWhenReconciliationAbortedAfterAssignmentDueToRejoin() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<TopicPartition> partitionsToAssignAtSetup;
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Mockito.when((Object)this.subscriptionState.assignedPartitions()).thenReturn(Set.of()).thenReturn(Set.of(new TopicPartition(TOPIC_0, 0)));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksRevokedCallbackExecuted = this.verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup);
        Set<TopicPartition> expectedPartitionsToRevoke = partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, 0));
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 1));
        this.verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(expectedPartitionsToRevoke, expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksRevokedCallbackExecuted.complete(null);
        this.membershipManager.onPollTimerExpired();
        this.membershipManager.onHeartbeatRequestGenerated();
        CompletableFuture<Void> onAllTasksLostCallbackExecuted = this.verifyOnAllTasksLostCallbackNeededEventAddedToBackgroundEventHandler();
        onAllTasksLostCallbackExecuted.complete(null);
        this.membershipManager.maybeRejoinStaleMember();
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        onTasksAssignedCallbackExecuted.complete(null);
        Assertions.assertNotEquals((Object)MemberState.ACKNOWLEDGING, (Object)this.membershipManager.state());
    }

    @Test
    public void testReconcilingWhenReconciliationAbortedAfterAssignmentDueToNotInReconciling() {
        Set<TopicPartition> expectedFullPartitionsToAssign;
        Set<TopicPartition> partitionsToAssignAtSetup;
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 1));
        Mockito.when((Object)this.subscriptionState.assignedPartitions()).thenReturn(Set.of()).thenReturn(Set.of(new TopicPartition(TOPIC_0, 0)));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        CompletableFuture<Void> onTasksRevokedCallbackExecuted = this.verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup);
        Set<TopicPartition> expectedPartitionsToRevoke = partitionsToAssignAtSetup = Set.of(new TopicPartition(TOPIC_0, 0));
        Set<TopicPartition> expectedNewPartitionsToAssign = expectedFullPartitionsToAssign = Set.of(new TopicPartition(TOPIC_0, 1));
        this.verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(expectedPartitionsToRevoke, expectedFullPartitionsToAssign, expectedNewPartitionsToAssign);
        onTasksRevokedCallbackExecuted.complete(null);
        this.membershipManager.transitionToFatal();
        CompletableFuture<Void> onAllTasksLostCallbackExecuted = this.verifyOnAllTasksLostCallbackNeededEventAddedToBackgroundEventHandler();
        onAllTasksLostCallbackExecuted.complete(null);
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        onTasksAssignedCallbackExecuted.complete(null);
        Assertions.assertNotEquals((Object)MemberState.ACKNOWLEDGING, (Object)this.membershipManager.state());
    }

    @Test
    public void testLeaveGroupWhenNotInGroup() {
        this.testLeaveGroupWhenNotInGroup(() -> ((StreamsMembershipManager)this.membershipManager).leaveGroup());
    }

    @Test
    public void testLeaveGroupOnCloseWhenNotInGroup() {
        this.testLeaveGroupWhenNotInGroup(() -> ((StreamsMembershipManager)this.membershipManager).leaveGroupOnClose());
    }

    @Test
    public void testIgnoreLeaveResponseWhenNotLeavingGroup() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.stable();
        CompletableFuture leaveResult = this.membershipManager.leaveGroup();
        CompletableFuture<Void> onTasksRevokedCallbackExecutedSetup = this.verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks);
        onTasksRevokedCallbackExecutedSetup.complete(null);
        this.membershipManager.onHeartbeatRequestGenerated();
        Assertions.assertEquals((Object)MemberState.UNSUBSCRIBED, (Object)this.membershipManager.state());
        this.membershipManager.onHeartbeatSuccess(new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.NONE.code()).setMemberId(this.membershipManager.memberId()).setMemberEpoch(1)));
        Assertions.assertFalse((boolean)leaveResult.isDone());
        this.membershipManager.onHeartbeatSuccess(new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.NONE.code()).setMemberId(this.membershipManager.memberId()).setMemberEpoch(-1)));
        Assertions.assertTrue((boolean)leaveResult.isDone());
        this.membershipManager.onSubscriptionUpdated();
        this.membershipManager.onConsumerPoll();
        this.membershipManager.onHeartbeatSuccess(new StreamsGroupHeartbeatResponse(new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.NONE.code()).setMemberId(this.membershipManager.memberId()).setMemberEpoch(-1)));
        Assertions.assertEquals((Object)MemberState.JOINING, (Object)this.membershipManager.state());
        Assertions.assertEquals((int)0, (int)this.membershipManager.memberEpoch());
    }

    private void testLeaveGroupWhenNotInGroup(Supplier<CompletableFuture<Void>> leaveGroup) {
        CompletableFuture<Void> future = leaveGroup.get();
        Assertions.assertFalse((boolean)this.membershipManager.isLeavingGroup());
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCancelled());
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).unsubscribe();
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener)).onGroupAssignmentUpdated(Set.of());
        StreamsMembershipManagerTest.verifyInStateUnsubscribed(this.membershipManager);
    }

    @Test
    public void testLeaveGroupWhenNotInGroupAndFenced() {
        this.testLeaveGroupOnCloseWhenNotInGroupAndFenced(() -> ((StreamsMembershipManager)this.membershipManager).leaveGroup());
    }

    @Test
    public void testLeaveGroupOnCloseWhenNotInGroupAndFenced() {
        this.testLeaveGroupOnCloseWhenNotInGroupAndFenced(() -> ((StreamsMembershipManager)this.membershipManager).leaveGroupOnClose());
    }

    private void testLeaveGroupOnCloseWhenNotInGroupAndFenced(Supplier<CompletableFuture<Void>> leaveGroup) {
        this.joining();
        this.fenced();
        this.verifyOnAllTasksLostCallbackNeededEventAddedToBackgroundEventHandler();
        CompletableFuture<Void> future = leaveGroup.get();
        Assertions.assertFalse((boolean)this.membershipManager.isLeavingGroup());
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCancelled());
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).unsubscribe();
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).assignFromSubscribed(Set.of());
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener, (VerificationMode)Mockito.times((int)2))).onGroupAssignmentUpdated(Set.of());
        StreamsMembershipManagerTest.verifyInStateUnsubscribed(this.membershipManager);
    }

    @Test
    public void testLeaveGroupWhenInGroupWithAssignment() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        CompletableFuture onGroupLeft = this.membershipManager.leaveGroup();
        CompletableFuture<Void> onTasksRevokedCallbackExecuted = this.verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks);
        Assertions.assertFalse((boolean)onGroupLeft.isDone());
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).unsubscribe();
        StreamsMembershipManagerTest.verifyInStatePrepareLeaving(this.membershipManager);
        CompletableFuture onGroupLeftBeforeRevocationCallback = this.membershipManager.leaveGroup();
        Assertions.assertEquals((Object)onGroupLeft, (Object)onGroupLeftBeforeRevocationCallback);
        CompletableFuture onGroupLeftOnCloseBeforeRevocationCallback = this.membershipManager.leaveGroupOnClose();
        Assertions.assertEquals((Object)onGroupLeft, (Object)onGroupLeftOnCloseBeforeRevocationCallback);
        onTasksRevokedCallbackExecuted.complete(null);
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener)).onGroupAssignmentUpdated(Set.of());
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).unsubscribe();
        Assertions.assertFalse((boolean)onGroupLeft.isDone());
        StreamsMembershipManagerTest.verifyInStateLeaving(this.membershipManager);
        CompletableFuture onGroupLeftAfterRevocationCallback = this.membershipManager.leaveGroup();
        Assertions.assertEquals((Object)onGroupLeft, (Object)onGroupLeftAfterRevocationCallback);
        this.membershipManager.onHeartbeatRequestGenerated();
        StreamsMembershipManagerTest.verifyInStateUnsubscribed(this.membershipManager);
        this.membershipManager.onHeartbeatSuccess(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0)), 2));
        Assertions.assertFalse((boolean)onGroupLeft.isDone());
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener, (VerificationMode)Mockito.never())).onMemberEpochUpdated(Optional.of(2), this.membershipManager.memberId());
        this.membershipManager.onHeartbeatSuccess(this.makeHeartbeatResponse(List.of(), List.of(), List.of(), -1));
        Assertions.assertTrue((boolean)onGroupLeft.isDone());
        Assertions.assertFalse((boolean)onGroupLeft.isCompletedExceptionally());
    }

    @Test
    public void testLeaveGroupOnCloseWhenInGroupWithAssignment() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        CompletableFuture onGroupLeft = this.membershipManager.leaveGroupOnClose();
        Assertions.assertFalse((boolean)onGroupLeft.isDone());
        StreamsMembershipManagerTest.verifyInStateLeaving(this.membershipManager);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).unsubscribe();
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener)).onGroupAssignmentUpdated(Set.of());
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler, (VerificationMode)Mockito.never())).add((BackgroundEvent)ArgumentMatchers.any(StreamsOnTasksRevokedCallbackNeededEvent.class));
        CompletableFuture onGroupLeftBeforeHeartbeatRequestGenerated = this.membershipManager.leaveGroup();
        Assertions.assertEquals((Object)onGroupLeft, (Object)onGroupLeftBeforeHeartbeatRequestGenerated);
        CompletableFuture onGroupLeftOnCloseBeforeHeartbeatRequestGenerated = this.membershipManager.leaveGroupOnClose();
        Assertions.assertEquals((Object)onGroupLeft, (Object)onGroupLeftOnCloseBeforeHeartbeatRequestGenerated);
        Assertions.assertFalse((boolean)onGroupLeft.isDone());
        this.membershipManager.onHeartbeatRequestGenerated();
        StreamsMembershipManagerTest.verifyInStateUnsubscribed(this.membershipManager);
        this.membershipManager.onHeartbeatSuccess(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0)), 2));
        Assertions.assertFalse((boolean)onGroupLeft.isDone());
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener, (VerificationMode)Mockito.never())).onMemberEpochUpdated(Optional.of(2), this.membershipManager.memberId());
        this.membershipManager.onHeartbeatSuccess(this.makeHeartbeatResponse(List.of(), List.of(), List.of(), -1));
        Assertions.assertTrue((boolean)onGroupLeft.isDone());
        Assertions.assertFalse((boolean)onGroupLeft.isCompletedExceptionally());
    }

    @Test
    public void testOnHeartbeatRequestSkippedWhenInLeaving() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, "topic");
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        CompletableFuture<Void> future = this.leaving();
        this.membershipManager.onHeartbeatRequestSkipped();
        StreamsMembershipManagerTest.verifyInStateUnsubscribed(this.membershipManager);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCancelled());
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
    }

    @Test
    public void testOnHeartbeatSuccessWhenInLeaving() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, "topic");
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        CompletableFuture<Void> future = this.leaving();
        this.membershipManager.onHeartbeatSuccess(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0)), this.membershipManager.memberEpoch() + 1));
        StreamsMembershipManagerTest.verifyInStateLeaving(this.membershipManager);
        Assertions.assertFalse((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCancelled());
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener, (VerificationMode)Mockito.never())).onMemberEpochUpdated(Optional.of(2), this.membershipManager.memberId());
    }

    @Test
    public void testOnHeartbeatSuccessWhenInUnsubscribeLeaveNotInProgress() {
        this.membershipManager.onHeartbeatSuccess(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0)), 1));
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener, (VerificationMode)Mockito.never())).onMemberEpochUpdated(Optional.of(1), this.membershipManager.memberId());
    }

    @Test
    public void testOnHeartbeatSuccessWhenInFenced() {
        this.joining();
        this.fenced();
        this.membershipManager.onHeartbeatSuccess(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0)), 1));
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener, (VerificationMode)Mockito.never())).onMemberEpochUpdated(Optional.of(1), this.membershipManager.memberId());
    }

    @Test
    public void testOnHeartbeatSuccessWhenInFatal() {
        this.membershipManager.transitionToFatal();
        this.membershipManager.onHeartbeatSuccess(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0)), 1));
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener, (VerificationMode)Mockito.never())).onMemberEpochUpdated(Optional.of(1), this.membershipManager.memberId());
    }

    @Test
    public void testOnHeartbeatSuccessWhenInStale() {
        this.joining();
        this.membershipManager.onPollTimerExpired();
        this.membershipManager.onHeartbeatRequestGenerated();
        this.membershipManager.onHeartbeatSuccess(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0)), 2));
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener, (VerificationMode)Mockito.never())).onMemberEpochUpdated(Optional.of(2), this.membershipManager.memberId());
    }

    @Test
    public void testOnHeartbeatSuccessWhenInReconciling() {
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(List.of(), 1));
        CompletableFuture<Void> onTasksAssignedCallbackExecuted = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set.of(), Set.of(), Set.of());
        onTasksAssignedCallbackExecuted.complete(null);
        this.membershipManager.onHeartbeatRequestGenerated();
        this.membershipManager.onHeartbeatSuccess(this.makeHeartbeatResponseWithActiveTasks(List.of(), 1));
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener)).onMemberEpochUpdated(Optional.of(1), this.membershipManager.memberId());
        StreamsMembershipManagerTest.verifyInStateStable(this.membershipManager);
    }

    @Test
    public void testOnPollTimerExpired() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.membershipManager.onPollTimerExpired();
        StreamsMembershipManagerTest.verifyInStateLeaving(this.membershipManager);
        Assertions.assertEquals((int)-1, (int)this.membershipManager.memberEpoch());
    }

    @Test
    public void testOnPollTimerExpiredWhenInFatal() {
        this.membershipManager.transitionToFatal();
        this.membershipManager.onPollTimerExpired();
        StreamsMembershipManagerTest.verifyInStateFatal(this.membershipManager);
    }

    @Test
    public void testOnPollTimerExpiredWhenInUnsubscribe() {
        this.membershipManager.onPollTimerExpired();
        StreamsMembershipManagerTest.verifyInStateUnsubscribed(this.membershipManager);
    }

    @Test
    public void testOnHeartbeatRequestGeneratedWhenInAcknowleding() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.membershipManager.onHeartbeatRequestGenerated();
        StreamsMembershipManagerTest.verifyInStateStable(this.membershipManager);
    }

    @Test
    public void testOnHeartbeatRequestGeneratedWhenInAcknowledgingAndNewTargetAssignment() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(1))));
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.membershipManager.onHeartbeatRequestGenerated();
        StreamsMembershipManagerTest.verifyInStateReconciling(this.membershipManager);
    }

    @Test
    public void testOnHeartbeatRequestGeneratedWhenInLeaving() {
        this.joining();
        this.leavingAtMemberEpochZero();
        this.membershipManager.onHeartbeatRequestGenerated();
        StreamsMembershipManagerTest.verifyInStateUnsubscribed(this.membershipManager);
    }

    @Test
    public void testOnHeartbeatRequestGeneratedWhenInLeavingAndPollTimerExpired() {
        this.joining();
        this.membershipManager.onPollTimerExpired();
        this.membershipManager.onHeartbeatRequestGenerated();
        CompletableFuture<Void> onAllTasksLostCallbackExecuted = this.verifyOnAllTasksLostCallbackNeededEventAddedToBackgroundEventHandler();
        StreamsMembershipManagerTest.verifyInStateStale(this.membershipManager);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).assignFromSubscribed(Set.of());
        onAllTasksLostCallbackExecuted.complete(null);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).assignFromSubscribed(Set.of());
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener)).onGroupAssignmentUpdated(Set.of());
    }

    @Test
    public void testOnHeartbeatFailureAfterLeaveRequestGenerated() {
        this.joining();
        CompletableFuture<Void> groupLeft = this.leavingAtMemberEpochZero();
        this.membershipManager.onHeartbeatRequestGenerated();
        Assertions.assertFalse((boolean)groupLeft.isDone());
        this.membershipManager.onRetriableHeartbeatFailure();
        Assertions.assertTrue((boolean)groupLeft.isDone());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testOnHeartbeatFailure(boolean retriable) {
        MetricName failedRebalanceTotalMetricName = this.metrics.metricName("failed-rebalance-total", "consumer-coordinator-metrics");
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        this.joining();
        this.time.sleep(1L);
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        double failedRebalancesTotalBefore = (Double)this.metrics.metric(failedRebalanceTotalMetricName).metricValue();
        Assertions.assertEquals((double)0.0, (double)failedRebalancesTotalBefore);
        if (retriable) {
            this.membershipManager.onRetriableHeartbeatFailure();
        } else {
            this.membershipManager.onFatalHeartbeatFailure();
        }
        double failedRebalancesTotalAfter = (Double)this.metrics.metric(failedRebalanceTotalMetricName).metricValue();
        Assertions.assertEquals((double)(retriable ? 0.0 : 1.0), (double)failedRebalancesTotalAfter);
    }

    @Test
    public void testOnFencedWhenInJoining() {
        this.joining();
        this.testOnFencedWhenInJoiningOrReconcilingOrAcknowledgingOrStable();
    }

    @Test
    public void testOnFencedWhenInReconciling() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        this.testOnFencedWhenInJoiningOrReconcilingOrAcknowledgingOrStable();
    }

    @Test
    public void testOnFencedWhenInAcknowledging() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.testOnFencedWhenInJoiningOrReconcilingOrAcknowledgingOrStable();
    }

    @Test
    public void testOnFencedWhenInStable() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasks = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.stable();
        this.testOnFencedWhenInJoiningOrReconcilingOrAcknowledgingOrStable();
    }

    private void testOnFencedWhenInJoiningOrReconcilingOrAcknowledgingOrStable() {
        this.membershipManager.onFenced();
        CompletableFuture<Void> onAllTasksLostCallbackExecuted = this.verifyOnAllTasksLostCallbackNeededEventAddedToBackgroundEventHandler();
        StreamsMembershipManagerTest.verifyInStateFenced(this.membershipManager);
        Assertions.assertEquals((int)0, (int)this.membershipManager.memberEpoch());
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).assignFromSubscribed(Set.of());
        onAllTasksLostCallbackExecuted.complete(null);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).assignFromSubscribed(Set.of());
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener)).onGroupAssignmentUpdated(Set.of());
        StreamsMembershipManagerTest.verifyInStateJoining(this.membershipManager);
    }

    @Test
    public void testOnFencedWhenInPrepareLeaving() {
        this.joining();
        this.testOnFencedWhenInPrepareLeavingOrLeaving(this.prepareLeaving());
    }

    @Test
    public void testOnFencedWhenInLeaving() {
        this.joining();
        this.testOnFencedWhenInPrepareLeavingOrLeaving(this.leavingAtMemberEpochZero());
    }

    private void testOnFencedWhenInPrepareLeavingOrLeaving(CompletableFuture<Void> onGroupLeft) {
        this.membershipManager.onFenced();
        StreamsMembershipManagerTest.verifyInStateUnsubscribed(this.membershipManager);
        Assertions.assertEquals((int)-1, (int)this.membershipManager.memberEpoch());
        Assertions.assertTrue((boolean)onGroupLeft.isDone());
        Assertions.assertFalse((boolean)onGroupLeft.isCancelled());
        Assertions.assertFalse((boolean)onGroupLeft.isCompletedExceptionally());
    }

    @Test
    public void testTransitionToFatalWhenInPrepareLeaving() {
        this.joining();
        this.testTransitionToFatalWhenInPrepareLeavingOrLeaving(this.prepareLeaving());
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener)).onMemberEpochUpdated(Optional.empty(), this.membershipManager.memberId());
    }

    @Test
    public void testTransitionToFatalWhenInLeaving() {
        this.joining();
        this.testTransitionToFatalWhenInPrepareLeavingOrLeaving(this.leavingAtMemberEpochZero());
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener, (VerificationMode)Mockito.times((int)2))).onMemberEpochUpdated(Optional.empty(), this.membershipManager.memberId());
    }

    private void testTransitionToFatalWhenInPrepareLeavingOrLeaving(CompletableFuture<Void> onGroupLeft) {
        this.membershipManager.transitionToFatal();
        StreamsMembershipManagerTest.verifyInStateFatal(this.membershipManager);
        Assertions.assertTrue((boolean)onGroupLeft.isDone());
        Assertions.assertFalse((boolean)onGroupLeft.isCancelled());
        Assertions.assertFalse((boolean)onGroupLeft.isCompletedExceptionally());
    }

    @Test
    public void testTransitionToFatalWhenInJoining() {
        this.joining();
        this.testTransitionToFatalWhenInJoiningOrReconcilingOrAcknowledgingOrStable();
    }

    @Test
    public void testTransitionToFatalWhenInReconciling() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.stable();
        this.testTransitionToFatalWhenInJoiningOrReconcilingOrAcknowledgingOrStable();
    }

    @Test
    public void testTransitionToFatalWhenInAcknowledging() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.testTransitionToFatalWhenInJoiningOrReconcilingOrAcknowledgingOrStable();
    }

    @Test
    public void testTransitionToFatalWhenInStable() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        Set<StreamsRebalanceData.TaskId> activeTasksSetup = Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        CompletableFuture<Void> onTasksAssignedCallbackExecutedSetup = this.verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasksSetup, Set.of(), Set.of());
        this.acknowledging(onTasksAssignedCallbackExecutedSetup);
        this.stable();
        this.testTransitionToFatalWhenInJoiningOrReconcilingOrAcknowledgingOrStable();
    }

    private void testTransitionToFatalWhenInJoiningOrReconcilingOrAcknowledgingOrStable() {
        this.membershipManager.transitionToFatal();
        CompletableFuture<Void> onAllTasksLostCallbackExecuted = this.verifyOnAllTasksLostCallbackNeededEventAddedToBackgroundEventHandler();
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).assignFromSubscribed(Set.of());
        onAllTasksLostCallbackExecuted.complete(null);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).assignFromSubscribed(Set.of());
        StreamsMembershipManagerTest.verifyInStateFatal(this.membershipManager);
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener)).onMemberEpochUpdated(Optional.empty(), this.membershipManager.memberId());
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener)).onGroupAssignmentUpdated(Set.of());
    }

    @Test
    public void testTransitionToFatalWhenInUnsubscribe() {
        this.membershipManager.transitionToFatal();
        StreamsMembershipManagerTest.verifyInStateFatal(this.membershipManager);
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener)).onMemberEpochUpdated(Optional.empty(), this.membershipManager.memberId());
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler, (VerificationMode)Mockito.never())).add((BackgroundEvent)ArgumentMatchers.any(StreamsOnAllTasksLostCallbackNeededEvent.class));
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).assignFromSubscribed(Set.of());
    }

    @Test
    public void testOnTasksAssignedCallbackCompleted() {
        CompletableFuture future = new CompletableFuture();
        StreamsOnTasksAssignedCallbackCompletedEvent event = new StreamsOnTasksAssignedCallbackCompletedEvent(future, Optional.empty());
        this.membershipManager.onTasksAssignedCallbackCompleted(event);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCancelled());
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
    }

    @Test
    public void testOnTasksAssignedCallbackCompletedWhenCallbackFails() {
        String errorMessage = "KABOOM!";
        CompletableFuture future = new CompletableFuture();
        StreamsOnTasksAssignedCallbackCompletedEvent event = new StreamsOnTasksAssignedCallbackCompletedEvent(future, Optional.of(new KafkaException("KABOOM!")));
        this.membershipManager.onTasksAssignedCallbackCompleted(event);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCancelled());
        Assertions.assertTrue((boolean)future.isCompletedExceptionally());
        ExecutionException executionException = (ExecutionException)Assertions.assertThrows(ExecutionException.class, future::get);
        Assertions.assertInstanceOf(KafkaException.class, (Object)executionException.getCause());
        Assertions.assertEquals((Object)"KABOOM!", (Object)executionException.getCause().getMessage());
        TreeSet<StreamsRebalanceData.TaskId> activeTasksToAssign = new TreeSet<StreamsRebalanceData.TaskId>();
        activeTasksToAssign.add(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, 0));
        System.out.println(activeTasksToAssign.stream().map(StreamsRebalanceData.TaskId::toString).collect(Collectors.joining(", ")));
    }

    @Test
    public void testOnTasksRevokedCallbackCompleted() {
        CompletableFuture future = new CompletableFuture();
        StreamsOnTasksRevokedCallbackCompletedEvent event = new StreamsOnTasksRevokedCallbackCompletedEvent(future, Optional.empty());
        this.membershipManager.onTasksRevokedCallbackCompleted(event);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCancelled());
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
    }

    @Test
    public void testOnTasksRevokedCallbackCompletedWhenCallbackFails() {
        String errorMessage = "KABOOM!";
        CompletableFuture future = new CompletableFuture();
        StreamsOnTasksRevokedCallbackCompletedEvent event = new StreamsOnTasksRevokedCallbackCompletedEvent(future, Optional.of(new KafkaException("KABOOM!")));
        this.membershipManager.onTasksRevokedCallbackCompleted(event);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCancelled());
        Assertions.assertTrue((boolean)future.isCompletedExceptionally());
        ExecutionException executionException = (ExecutionException)Assertions.assertThrows(ExecutionException.class, future::get);
        Assertions.assertInstanceOf(KafkaException.class, (Object)executionException.getCause());
        Assertions.assertEquals((Object)"KABOOM!", (Object)executionException.getCause().getMessage());
    }

    @Test
    public void testOnAllTasksLostCallbackCompleted() {
        CompletableFuture future = new CompletableFuture();
        StreamsOnAllTasksLostCallbackCompletedEvent event = new StreamsOnAllTasksLostCallbackCompletedEvent(future, Optional.empty());
        this.membershipManager.onAllTasksLostCallbackCompleted(event);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCancelled());
        Assertions.assertFalse((boolean)future.isCompletedExceptionally());
    }

    @Test
    public void testOnAllTasksLostCallbackCompletedWhenCallbackFails() {
        String errorMessage = "KABOOM!";
        CompletableFuture future = new CompletableFuture();
        StreamsOnAllTasksLostCallbackCompletedEvent event = new StreamsOnAllTasksLostCallbackCompletedEvent(future, Optional.of(new KafkaException("KABOOM!")));
        this.membershipManager.onAllTasksLostCallbackCompleted(event);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertFalse((boolean)future.isCancelled());
        Assertions.assertTrue((boolean)future.isCompletedExceptionally());
        ExecutionException executionException = (ExecutionException)Assertions.assertThrows(ExecutionException.class, future::get);
        Assertions.assertInstanceOf(KafkaException.class, (Object)executionException.getCause());
        Assertions.assertEquals((Object)"KABOOM!", (Object)executionException.getCause().getMessage());
    }

    @Test
    public void testMaybeRejoinStaleMember() {
        this.joining();
        this.membershipManager.onPollTimerExpired();
        this.membershipManager.onHeartbeatRequestGenerated();
        CompletableFuture<Void> onAllTasksLostCallbackExecuted = this.verifyOnAllTasksLostCallbackNeededEventAddedToBackgroundEventHandler();
        StreamsMembershipManagerTest.verifyInStateStale(this.membershipManager);
        this.membershipManager.maybeRejoinStaleMember();
        StreamsMembershipManagerTest.verifyInStateStale(this.membershipManager);
        onAllTasksLostCallbackExecuted.complete(null);
        StreamsMembershipManagerTest.verifyInStateJoining(this.membershipManager);
        Assertions.assertEquals((int)0, (int)this.membershipManager.memberEpoch());
    }

    @Test
    public void testForDuplicateRegistrationOfSameStateListener() {
        MemberStateListener listener1 = new MemberStateListener(){

            public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId) {
            }
        };
        MemberStateListener listener2 = new MemberStateListener(){

            public void onMemberEpochUpdated(Optional<Integer> memberEpoch, String memberId) {
            }
        };
        this.membershipManager.registerStateListener(listener1);
        this.membershipManager.registerStateListener(listener2);
        Exception exception = (Exception)Assertions.assertThrows(IllegalArgumentException.class, () -> this.membershipManager.registerStateListener(listener1));
        Assertions.assertEquals((Object)"Listener is already registered.", (Object)exception.getMessage());
    }

    @Test
    public void testConsumerPollWhenNotJoining() {
        this.setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
        this.joining();
        this.reconcile(this.makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(Integer.valueOf(0))));
        this.membershipManager.onSubscriptionUpdated();
        this.membershipManager.onConsumerPoll();
        StreamsMembershipManagerTest.verifyInStateReconciling(this.membershipManager);
    }

    @Test
    public void testConsumerPollWhenSubscriptionNotUpdated() {
        this.membershipManager.onConsumerPoll();
        StreamsMembershipManagerTest.verifyInStateUnsubscribed(this.membershipManager);
    }

    private void verifyThatNoTasksHaveBeenRevoked() {
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler, (VerificationMode)Mockito.never())).add((BackgroundEvent)ArgumentMatchers.any(StreamsOnTasksRevokedCallbackNeededEvent.class));
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).markPendingRevocation((Set)ArgumentMatchers.any());
    }

    private void verifyInStateReconcilingBeforeOnTaskRevokedCallbackExecuted(Set<TopicPartition> expectedPartitionsToRevoke, Set<TopicPartition> expectedAllPartitionsToAssign, Set<TopicPartition> expectedNewPartitionsToAssign) {
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).markPendingRevocation(expectedPartitionsToRevoke);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).assignFromSubscribedAwaitingCallback(expectedAllPartitionsToAssign, expectedNewPartitionsToAssign);
        StreamsMembershipManagerTest.verifyInStateReconciling(this.membershipManager);
    }

    private void verifyInStateReconcilingBeforeOnTaskAssignedCallbackExecuted(Set<TopicPartition> expectedAllPartitionsToAssign, Set<TopicPartition> expectedNewPartitionsToAssign) {
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).assignFromSubscribedAwaitingCallback(expectedAllPartitionsToAssign, expectedNewPartitionsToAssign);
        ((MemberStateListener)Mockito.verify((Object)this.memberStateListener)).onGroupAssignmentUpdated(expectedAllPartitionsToAssign);
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState, (VerificationMode)Mockito.never())).enablePartitionsAwaitingCallback(expectedNewPartitionsToAssign);
        StreamsMembershipManagerTest.verifyInStateReconciling(this.membershipManager);
    }

    private void verifyInStateAcknowledgingAfterOnTaskAssignedCallbackExecuted(Collection<TopicPartition> expectedNewPartitionsToAssign) {
        ((SubscriptionState)Mockito.verify((Object)this.subscriptionState)).enablePartitionsAwaitingCallback(expectedNewPartitionsToAssign);
        StreamsMembershipManagerTest.verifyInStateAcknowledging(this.membershipManager);
    }

    private static void verifyInStateReconciling(StreamsMembershipManager membershipManager) {
        Assertions.assertEquals((Object)MemberState.RECONCILING, (Object)membershipManager.state());
        Assertions.assertFalse((boolean)membershipManager.shouldNotWaitForHeartbeatInterval());
        Assertions.assertFalse((boolean)membershipManager.shouldSkipHeartbeat());
        Assertions.assertFalse((boolean)membershipManager.isLeavingGroup());
    }

    private static void verifyInStateAcknowledging(StreamsMembershipManager membershipManager) {
        Assertions.assertEquals((Object)MemberState.ACKNOWLEDGING, (Object)membershipManager.state());
        Assertions.assertTrue((boolean)membershipManager.shouldNotWaitForHeartbeatInterval());
        Assertions.assertFalse((boolean)membershipManager.shouldSkipHeartbeat());
        Assertions.assertFalse((boolean)membershipManager.isLeavingGroup());
    }

    private static void verifyInStateLeaving(StreamsMembershipManager membershipManager) {
        Assertions.assertEquals((Object)MemberState.LEAVING, (Object)membershipManager.state());
        Assertions.assertTrue((boolean)membershipManager.shouldNotWaitForHeartbeatInterval());
        Assertions.assertFalse((boolean)membershipManager.shouldSkipHeartbeat());
        Assertions.assertTrue((boolean)membershipManager.isLeavingGroup());
    }

    private static void verifyInStatePrepareLeaving(StreamsMembershipManager membershipManager) {
        Assertions.assertEquals((Object)MemberState.PREPARE_LEAVING, (Object)membershipManager.state());
        Assertions.assertFalse((boolean)membershipManager.shouldNotWaitForHeartbeatInterval());
        Assertions.assertFalse((boolean)membershipManager.shouldSkipHeartbeat());
        Assertions.assertTrue((boolean)membershipManager.isLeavingGroup());
    }

    private static void verifyInStateUnsubscribed(StreamsMembershipManager membershipManager) {
        Assertions.assertEquals((Object)MemberState.UNSUBSCRIBED, (Object)membershipManager.state());
        Assertions.assertFalse((boolean)membershipManager.shouldNotWaitForHeartbeatInterval());
        Assertions.assertTrue((boolean)membershipManager.shouldSkipHeartbeat());
        Assertions.assertFalse((boolean)membershipManager.isLeavingGroup());
    }

    private static void verifyInStateJoining(StreamsMembershipManager membershipManager) {
        Assertions.assertEquals((Object)MemberState.JOINING, (Object)membershipManager.state());
        Assertions.assertTrue((boolean)membershipManager.shouldNotWaitForHeartbeatInterval());
        Assertions.assertFalse((boolean)membershipManager.shouldSkipHeartbeat());
        Assertions.assertFalse((boolean)membershipManager.isLeavingGroup());
    }

    private static void verifyInStateStable(StreamsMembershipManager membershipManager) {
        Assertions.assertEquals((Object)MemberState.STABLE, (Object)membershipManager.state());
        Assertions.assertFalse((boolean)membershipManager.shouldNotWaitForHeartbeatInterval());
        Assertions.assertFalse((boolean)membershipManager.shouldSkipHeartbeat());
        Assertions.assertFalse((boolean)membershipManager.isLeavingGroup());
    }

    private static void verifyInStateFenced(StreamsMembershipManager membershipManager) {
        Assertions.assertEquals((Object)MemberState.FENCED, (Object)membershipManager.state());
        Assertions.assertFalse((boolean)membershipManager.shouldNotWaitForHeartbeatInterval());
        Assertions.assertTrue((boolean)membershipManager.shouldSkipHeartbeat());
        Assertions.assertFalse((boolean)membershipManager.isLeavingGroup());
    }

    private static void verifyInStateFatal(StreamsMembershipManager membershipManager) {
        Assertions.assertEquals((Object)MemberState.FATAL, (Object)membershipManager.state());
        Assertions.assertFalse((boolean)membershipManager.shouldNotWaitForHeartbeatInterval());
        Assertions.assertTrue((boolean)membershipManager.shouldSkipHeartbeat());
        Assertions.assertFalse((boolean)membershipManager.isLeavingGroup());
    }

    private static void verifyInStateStale(StreamsMembershipManager membershipManager) {
        Assertions.assertEquals((Object)MemberState.STALE, (Object)membershipManager.state());
        Assertions.assertFalse((boolean)membershipManager.shouldNotWaitForHeartbeatInterval());
        Assertions.assertTrue((boolean)membershipManager.shouldSkipHeartbeat());
        Assertions.assertFalse((boolean)membershipManager.isLeavingGroup());
    }

    private CompletableFuture<Void> verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(Set<StreamsRebalanceData.TaskId> activeTasks, Set<StreamsRebalanceData.TaskId> standbyTasks, Set<StreamsRebalanceData.TaskId> warmupTasks) {
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler, (VerificationMode)Mockito.times((int)(++this.onTasksAssignedCallbackNeededAddCount)))).add((BackgroundEvent)this.onTasksAssignedCallbackNeededEventCaptor.capture());
        StreamsOnTasksAssignedCallbackNeededEvent onTasksAssignedCallbackNeeded = (StreamsOnTasksAssignedCallbackNeededEvent)this.onTasksAssignedCallbackNeededEventCaptor.getValue();
        Assertions.assertEquals((Object)this.makeTaskAssignment(activeTasks, standbyTasks, warmupTasks), (Object)onTasksAssignedCallbackNeeded.assignment());
        return onTasksAssignedCallbackNeeded.future();
    }

    private CompletableFuture<Void> verifyOnTasksRevokedCallbackNeededEventAddedToBackgroundEventHandler(Set<StreamsRebalanceData.TaskId> activeTasksToRevoke) {
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)this.onTasksRevokedCallbackNeededEventCaptor.capture());
        StreamsOnTasksRevokedCallbackNeededEvent onTasksRevokedCallbackNeededEvent = (StreamsOnTasksRevokedCallbackNeededEvent)this.onTasksRevokedCallbackNeededEventCaptor.getValue();
        Assertions.assertEquals(activeTasksToRevoke, (Object)onTasksRevokedCallbackNeededEvent.activeTasksToRevoke());
        return onTasksRevokedCallbackNeededEvent.future();
    }

    private CompletableFuture<Void> verifyOnAllTasksLostCallbackNeededEventAddedToBackgroundEventHandler() {
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)this.onAllTasksLostCallbackNeededEventCaptor.capture());
        StreamsOnAllTasksLostCallbackNeededEvent onAllTasksLostCallbackNeededEvent = (StreamsOnAllTasksLostCallbackNeededEvent)this.onAllTasksLostCallbackNeededEventCaptor.getValue();
        return onAllTasksLostCallbackNeededEvent.future();
    }

    private void verifyTasksNotAssigned(Set<StreamsRebalanceData.TaskId> activeTasks, Set<StreamsRebalanceData.TaskId> standbyTasks, Set<StreamsRebalanceData.TaskId> warmupTasks) {
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler, (VerificationMode)Mockito.never())).add((BackgroundEvent)ArgumentMatchers.argThat(a -> {
            if (a instanceof StreamsOnTasksAssignedCallbackNeededEvent) {
                return ((StreamsOnTasksAssignedCallbackNeededEvent)a).assignment().equals((Object)this.makeTaskAssignment(activeTasks, standbyTasks, warmupTasks));
            }
            return false;
        }));
    }

    private void setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(String subtopologyId, String topicName) {
        Mockito.when((Object)this.streamsRebalanceData.subtopologies()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)subtopologyId, (Object)new StreamsRebalanceData.Subtopology(Set.of(topicName), Set.of(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList()))}));
    }

    private void setupStreamsReabalanceDataWithTwoSubtopologies(String subtopologyId1, String topicName1, String subtopologyId2, String topicName2) {
        Mockito.when((Object)this.streamsRebalanceData.subtopologies()).thenReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)subtopologyId1, (Object)new StreamsRebalanceData.Subtopology(Set.of(topicName1), Set.of(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList())), Utils.mkEntry((Object)subtopologyId2, (Object)new StreamsRebalanceData.Subtopology(Set.of(topicName2), Set.of(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList()))}));
    }

    private StreamsGroupHeartbeatResponse makeHeartbeatResponseWithActiveTasks(String subtopologyId, List<Integer> partitions) {
        return this.makeHeartbeatResponseWithActiveTasks(List.of(new StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId(subtopologyId).setPartitions(partitions)), 1);
    }

    private StreamsGroupHeartbeatResponse makeHeartbeatResponseWithActiveTasks(String subtopologyId, List<Integer> partitions, int memberEpoch) {
        return this.makeHeartbeatResponseWithActiveTasks(List.of(new StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId(subtopologyId).setPartitions(partitions)), memberEpoch);
    }

    private StreamsGroupHeartbeatResponse makeHeartbeatResponseWithStandbyTasks(String subtopologyId, List<Integer> partitions) {
        return this.makeHeartbeatResponse(Collections.emptyList(), List.of(new StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId(subtopologyId).setPartitions(partitions)), Collections.emptyList(), 1);
    }

    private StreamsGroupHeartbeatResponse makeHeartbeatResponseWithWarmupTasks(String subtopologyId, List<Integer> partitions) {
        return this.makeHeartbeatResponse(Collections.emptyList(), Collections.emptyList(), List.of(new StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId(subtopologyId).setPartitions(partitions)), 1);
    }

    private StreamsGroupHeartbeatResponse makeHeartbeatResponseWithActiveTasks(String subtopologyId0, List<Integer> partitions0, String subtopologyId1, List<Integer> partitions1) {
        return this.makeHeartbeatResponseWithActiveTasks(List.of(new StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId(subtopologyId0).setPartitions(partitions0), new StreamsGroupHeartbeatResponseData.TaskIds().setSubtopologyId(subtopologyId1).setPartitions(partitions1)), 1);
    }

    private StreamsGroupHeartbeatResponse makeHeartbeatResponseWithActiveTasks(List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks, int memberEpoch) {
        return this.makeHeartbeatResponse(activeTasks, Collections.emptyList(), Collections.emptyList(), memberEpoch);
    }

    private StreamsGroupHeartbeatResponse makeHeartbeatResponse(List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks, List<StreamsGroupHeartbeatResponseData.TaskIds> standbyTasks, List<StreamsGroupHeartbeatResponseData.TaskIds> warmupTasks) {
        return this.makeHeartbeatResponse(activeTasks, standbyTasks, warmupTasks, 1);
    }

    private StreamsGroupHeartbeatResponse makeHeartbeatResponse(List<StreamsGroupHeartbeatResponseData.TaskIds> activeTasks, List<StreamsGroupHeartbeatResponseData.TaskIds> standbyTasks, List<StreamsGroupHeartbeatResponseData.TaskIds> warmupTasks, int memberEpoch) {
        StreamsGroupHeartbeatResponseData responseData = new StreamsGroupHeartbeatResponseData().setErrorCode(Errors.NONE.code()).setMemberId(this.membershipManager.memberId()).setMemberEpoch(memberEpoch).setActiveTasks(activeTasks).setStandbyTasks(standbyTasks).setWarmupTasks(warmupTasks);
        return new StreamsGroupHeartbeatResponse(responseData);
    }

    private StreamsRebalanceData.Assignment makeTaskAssignment(Set<StreamsRebalanceData.TaskId> activeTasks, Set<StreamsRebalanceData.TaskId> standbyTasks, Set<StreamsRebalanceData.TaskId> warmupTasks) {
        return new StreamsRebalanceData.Assignment(activeTasks, standbyTasks, warmupTasks);
    }

    private void joining() {
        this.membershipManager.onSubscriptionUpdated();
        this.membershipManager.onConsumerPoll();
        StreamsMembershipManagerTest.verifyInStateJoining(this.membershipManager);
    }

    private void reconcile(StreamsGroupHeartbeatResponse response) {
        this.membershipManager.onHeartbeatSuccess(response);
        this.membershipManager.poll(this.time.milliseconds());
        StreamsMembershipManagerTest.verifyInStateReconciling(this.membershipManager);
    }

    private void acknowledging(CompletableFuture<Void> future) {
        future.complete(null);
        StreamsMembershipManagerTest.verifyInStateAcknowledging(this.membershipManager);
    }

    private CompletableFuture<Void> prepareLeaving() {
        CompletableFuture onGroupLeft = this.membershipManager.leaveGroup();
        StreamsMembershipManagerTest.verifyInStatePrepareLeaving(this.membershipManager);
        return onGroupLeft;
    }

    private CompletableFuture<Void> leaving() {
        CompletableFuture<Void> future = this.prepareLeaving();
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)this.onTasksRevokedCallbackNeededEventCaptor.capture());
        StreamsOnTasksRevokedCallbackNeededEvent onTasksRevokedCallbackNeededEvent = (StreamsOnTasksRevokedCallbackNeededEvent)this.onTasksRevokedCallbackNeededEventCaptor.getValue();
        onTasksRevokedCallbackNeededEvent.future().complete(null);
        StreamsMembershipManagerTest.verifyInStateLeaving(this.membershipManager);
        return future;
    }

    private CompletableFuture<Void> leavingAtMemberEpochZero() {
        CompletableFuture<Void> future = this.prepareLeaving();
        ((BackgroundEventHandler)Mockito.verify((Object)this.backgroundEventHandler)).add((BackgroundEvent)this.onAllTasksLostCallbackNeededEventCaptor.capture());
        StreamsOnAllTasksLostCallbackNeededEvent onAllTasksLostCallbackNeededEvent = (StreamsOnAllTasksLostCallbackNeededEvent)this.onAllTasksLostCallbackNeededEventCaptor.getValue();
        onAllTasksLostCallbackNeededEvent.future().complete(null);
        StreamsMembershipManagerTest.verifyInStateLeaving(this.membershipManager);
        return future;
    }

    private void stable() {
        this.membershipManager.onHeartbeatRequestGenerated();
    }

    private void fenced() {
        this.membershipManager.onFenced();
    }
}

