/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals.assignment;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ListOffsetsResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.assignment.AssignmentConfigs;
import org.apache.kafka.streams.processor.assignment.ProcessId;
import org.apache.kafka.streams.processor.internals.InternalTopicManager;
import org.apache.kafka.streams.processor.internals.TopologyMetadata;
import org.apache.kafka.streams.processor.internals.assignment.ClientState;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.processor.internals.assignment.ReferenceContainer;
import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.hamcrest.BaseMatcher;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public final class AssignmentTestUtils {
    public static final ProcessId PID_1 = AssignmentTestUtils.processIdForInt(1);
    public static final ProcessId PID_2 = AssignmentTestUtils.processIdForInt(2);
    public static final ProcessId PID_3 = AssignmentTestUtils.processIdForInt(3);
    public static final ProcessId PID_4 = AssignmentTestUtils.processIdForInt(4);
    public static final ProcessId PID_5 = AssignmentTestUtils.processIdForInt(5);
    public static final ProcessId PID_6 = AssignmentTestUtils.processIdForInt(6);
    public static final ProcessId PID_7 = AssignmentTestUtils.processIdForInt(7);
    public static final ProcessId PID_8 = AssignmentTestUtils.processIdForInt(8);
    public static final ProcessId PID_9 = AssignmentTestUtils.processIdForInt(9);
    public static final String RACK_0 = "rack0";
    public static final String RACK_1 = "rack1";
    public static final String RACK_2 = "rack2";
    public static final String RACK_3 = "rack3";
    public static final String RACK_4 = "rack4";
    public static final Node NODE_0 = new Node(0, "node0", 1, "rack0");
    public static final Node NODE_1 = new Node(1, "node1", 1, "rack1");
    public static final Node NODE_2 = new Node(2, "node2", 1, "rack2");
    public static final Node NODE_3 = new Node(3, "node3", 1, "rack3");
    public static final Node NODE_4 = new Node(4, "node4", 1, "rack4");
    public static final Node NO_RACK_NODE = new Node(3, "node3", 1);
    public static final Node[] REPLICA_0 = new Node[]{NODE_0, NODE_1};
    public static final Node[] REPLICA_1 = new Node[]{NODE_1, NODE_2};
    public static final Node[] REPLICA_2 = new Node[]{NODE_0, NODE_2};
    public static final Node[] REPLICA_3 = new Node[]{NODE_1, NODE_3};
    public static final Node[] REPLICA_4 = new Node[]{NODE_3, NODE_4};
    public static final String TP_0_NAME = "topic0";
    public static final String TP_1_NAME = "topic1";
    public static final String TP_2_NAME = "topic2";
    public static final String TP_3_NAME = "topic3";
    public static final String CHANGELOG_TP_0_NAME = "store-0-changelog";
    public static final String CHANGELOG_TP_1_NAME = "store-1-changelog";
    public static final String CHANGELOG_TP_2_NAME = "store-2-changelog";
    public static final String CHANGELOG_TP_3_NAME = "store-3-changelog";
    public static final TopicPartition CHANGELOG_TP_0_0 = new TopicPartition("store-0-changelog", 0);
    public static final TopicPartition CHANGELOG_TP_0_1 = new TopicPartition("store-0-changelog", 1);
    public static final TopicPartition CHANGELOG_TP_0_2 = new TopicPartition("store-0-changelog", 2);
    public static final TopicPartition CHANGELOG_TP_0_3 = new TopicPartition("store-0-changelog", 3);
    public static final TopicPartition CHANGELOG_TP_0_4 = new TopicPartition("store-0-changelog", 4);
    public static final TopicPartition CHANGELOG_TP_0_5 = new TopicPartition("store-0-changelog", 5);
    public static final TopicPartition CHANGELOG_TP_0_6 = new TopicPartition("store-0-changelog", 6);
    public static final TopicPartition CHANGELOG_TP_1_0 = new TopicPartition("store-1-changelog", 0);
    public static final TopicPartition CHANGELOG_TP_1_1 = new TopicPartition("store-1-changelog", 1);
    public static final TopicPartition CHANGELOG_TP_1_2 = new TopicPartition("store-1-changelog", 2);
    public static final TopicPartition CHANGELOG_TP_1_3 = new TopicPartition("store-1-changelog", 3);
    public static final TopicPartition CHANGELOG_TP_2_0 = new TopicPartition("store-2-changelog", 0);
    public static final TopicPartition CHANGELOG_TP_2_1 = new TopicPartition("store-2-changelog", 1);
    public static final TopicPartition CHANGELOG_TP_2_2 = new TopicPartition("store-2-changelog", 2);
    public static final TopicPartition CHANGELOG_TP_2_3 = new TopicPartition("store-2-changelog", 3);
    public static final TopicPartition CHANGELOG_TP_3_0 = new TopicPartition("store-3-changelog", 0);
    public static final TopicPartition CHANGELOG_TP_3_1 = new TopicPartition("store-3-changelog", 1);
    public static final TopicPartition CHANGELOG_TP_3_2 = new TopicPartition("store-3-changelog", 2);
    public static final TopicPartition TP_0_0 = new TopicPartition("topic0", 0);
    public static final TopicPartition TP_0_1 = new TopicPartition("topic0", 1);
    public static final TopicPartition TP_0_2 = new TopicPartition("topic0", 2);
    public static final TopicPartition TP_0_3 = new TopicPartition("topic0", 3);
    public static final TopicPartition TP_0_4 = new TopicPartition("topic0", 4);
    public static final TopicPartition TP_0_5 = new TopicPartition("topic0", 5);
    public static final TopicPartition TP_0_6 = new TopicPartition("topic0", 6);
    public static final TopicPartition TP_1_0 = new TopicPartition("topic1", 0);
    public static final TopicPartition TP_1_1 = new TopicPartition("topic1", 1);
    public static final TopicPartition TP_1_2 = new TopicPartition("topic1", 2);
    public static final TopicPartition TP_1_3 = new TopicPartition("topic1", 3);
    public static final TopicPartition TP_2_0 = new TopicPartition("topic2", 0);
    public static final TopicPartition TP_2_1 = new TopicPartition("topic2", 1);
    public static final TopicPartition TP_2_2 = new TopicPartition("topic2", 2);
    public static final TopicPartition TP_2_3 = new TopicPartition("topic2", 3);
    public static final TopicPartition TP_3_0 = new TopicPartition("topic3", 0);
    public static final TopicPartition TP_3_1 = new TopicPartition("topic3", 1);
    public static final TopicPartition TP_3_2 = new TopicPartition("topic3", 2);
    public static final PartitionInfo PI_0_0 = new PartitionInfo("topic0", 0, NODE_0, REPLICA_0, REPLICA_0);
    public static final PartitionInfo PI_0_1 = new PartitionInfo("topic0", 1, NODE_1, REPLICA_1, REPLICA_1);
    public static final PartitionInfo PI_0_2 = new PartitionInfo("topic0", 2, NODE_1, REPLICA_1, REPLICA_1);
    public static final PartitionInfo PI_0_3 = new PartitionInfo("topic0", 3, NODE_2, REPLICA_2, REPLICA_2);
    public static final PartitionInfo PI_0_4 = new PartitionInfo("topic0", 4, NODE_3, REPLICA_3, REPLICA_3);
    public static final PartitionInfo PI_0_5 = new PartitionInfo("topic0", 5, NODE_4, REPLICA_4, REPLICA_4);
    public static final PartitionInfo PI_0_6 = new PartitionInfo("topic0", 6, NODE_2, REPLICA_2, REPLICA_2);
    public static final PartitionInfo PI_1_0 = new PartitionInfo("topic1", 0, NODE_2, REPLICA_2, REPLICA_2);
    public static final PartitionInfo PI_1_1 = new PartitionInfo("topic1", 1, NODE_3, REPLICA_3, REPLICA_3);
    public static final PartitionInfo PI_1_2 = new PartitionInfo("topic1", 2, NODE_0, REPLICA_0, REPLICA_0);
    public static final PartitionInfo PI_1_3 = new PartitionInfo("topic1", 3, NODE_1, REPLICA_1, REPLICA_1);
    public static final PartitionInfo PI_2_0 = new PartitionInfo("topic2", 0, NODE_4, REPLICA_4, REPLICA_4);
    public static final PartitionInfo PI_2_1 = new PartitionInfo("topic2", 1, NODE_3, REPLICA_3, REPLICA_3);
    public static final PartitionInfo PI_2_2 = new PartitionInfo("topic2", 2, NODE_1, REPLICA_4, REPLICA_4);
    public static final PartitionInfo PI_2_3 = new PartitionInfo("topic2", 3, NODE_0, REPLICA_0, REPLICA_0);
    public static final PartitionInfo PI_3_0 = new PartitionInfo("topic3", 0, NODE_2, REPLICA_2, REPLICA_2);
    public static final PartitionInfo PI_3_1 = new PartitionInfo("topic3", 1, NODE_3, REPLICA_3, REPLICA_3);
    public static final PartitionInfo PI_3_2 = new PartitionInfo("topic3", 2, NODE_4, REPLICA_4, REPLICA_4);
    public static final TaskId TASK_0_0 = new TaskId(0, 0);
    public static final TaskId TASK_0_1 = new TaskId(0, 1);
    public static final TaskId TASK_0_2 = new TaskId(0, 2);
    public static final TaskId TASK_0_3 = new TaskId(0, 3);
    public static final TaskId TASK_0_4 = new TaskId(0, 4);
    public static final TaskId TASK_0_5 = new TaskId(0, 5);
    public static final TaskId TASK_0_6 = new TaskId(0, 6);
    public static final TaskId TASK_1_0 = new TaskId(1, 0);
    public static final TaskId TASK_1_1 = new TaskId(1, 1);
    public static final TaskId TASK_1_2 = new TaskId(1, 2);
    public static final TaskId TASK_1_3 = new TaskId(1, 3);
    public static final TaskId TASK_2_0 = new TaskId(2, 0);
    public static final TaskId TASK_2_1 = new TaskId(2, 1);
    public static final TaskId TASK_2_2 = new TaskId(2, 2);
    public static final TaskId TASK_2_3 = new TaskId(2, 3);
    public static final TaskId TASK_3_0 = new TaskId(3, 0);
    public static final TaskId TASK_3_1 = new TaskId(3, 1);
    public static final TaskId TASK_3_2 = new TaskId(3, 2);
    public static final TaskId NAMED_TASK_T0_0_0 = new TaskId(0, 0, "topology0");
    public static final TaskId NAMED_TASK_T0_0_1 = new TaskId(0, 1, "topology0");
    public static final TaskId NAMED_TASK_T0_1_0 = new TaskId(1, 0, "topology0");
    public static final TaskId NAMED_TASK_T0_1_1 = new TaskId(1, 1, "topology0");
    public static final TaskId NAMED_TASK_T1_0_0 = new TaskId(0, 0, "topology1");
    public static final TaskId NAMED_TASK_T1_0_1 = new TaskId(0, 1, "topology1");
    public static final TaskId NAMED_TASK_T2_0_0 = new TaskId(0, 0, "topology2");
    public static final TaskId NAMED_TASK_T2_2_0 = new TaskId(2, 0, "topology2");
    public static final TopologyMetadata.Subtopology SUBTOPOLOGY_0 = new TopologyMetadata.Subtopology(0, null);
    public static final TopologyMetadata.Subtopology SUBTOPOLOGY_1 = new TopologyMetadata.Subtopology(1, null);
    public static final TopologyMetadata.Subtopology SUBTOPOLOGY_2 = new TopologyMetadata.Subtopology(2, null);
    public static final Set<TaskId> EMPTY_TASKS = Collections.emptySet();
    public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<TopicPartition, Long>();
    public static final List<String> EMPTY_RACK_AWARE_ASSIGNMENT_TAGS = Collections.emptyList();
    public static final Map<String, String> EMPTY_CLIENT_TAGS = Collections.emptyMap();
    private static final String USER_END_POINT = "localhost:8080";
    private static final String APPLICATION_ID = "stream-partition-assignor-test";
    private static Random random;
    public static final String TOPIC_PREFIX = "topic";
    public static final String CHANGELOG_TOPIC_PREFIX = "changelog-topic";
    public static final String RACK_PREFIX = "rack";

    private AssignmentTestUtils() {
    }

    static Map<ProcessId, ClientState> getClientStatesMap(ClientState ... states) {
        HashMap<ProcessId, ClientState> clientStates = new HashMap<ProcessId, ClientState>();
        int nthState = 1;
        for (ClientState state : states) {
            clientStates.put(AssignmentTestUtils.processIdForInt(nthState), state);
            ++nthState;
        }
        return clientStates;
    }

    public static AdminClient createMockAdminClientForAssignor(Map<TopicPartition, Long> changelogEndOffsets) {
        AdminClient adminClient = (AdminClient)Mockito.mock(AdminClient.class);
        ListOffsetsResult result = (ListOffsetsResult)Mockito.mock(ListOffsetsResult.class);
        Mockito.when((Object)adminClient.listOffsets((Map)ArgumentMatchers.any())).thenReturn((Object)result);
        for (Map.Entry<TopicPartition, Long> entry : changelogEndOffsets.entrySet()) {
            KafkaFutureImpl partitionFuture = new KafkaFutureImpl();
            ListOffsetsResult.ListOffsetsResultInfo info = (ListOffsetsResult.ListOffsetsResultInfo)Mockito.mock(ListOffsetsResult.ListOffsetsResultInfo.class);
            Mockito.lenient().when((Object)info.offset()).thenReturn((Object)entry.getValue());
            partitionFuture.complete((Object)info);
            Mockito.lenient().when((Object)result.partitionResult(entry.getKey())).thenReturn((Object)partitionFuture);
        }
        return adminClient;
    }

    public static SubscriptionInfo getInfo(ProcessId processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
        return new SubscriptionInfo(11, 11, processId, null, AssignmentTestUtils.getTaskOffsetSums(prevTasks, standbyTasks), 0, 0, EMPTY_CLIENT_TAGS);
    }

    public static SubscriptionInfo getInfo(ProcessId processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
        return new SubscriptionInfo(11, 11, processId, userEndPoint, AssignmentTestUtils.getTaskOffsetSums(prevTasks, standbyTasks), 0, 0, EMPTY_CLIENT_TAGS);
    }

    public static SubscriptionInfo getInfo(ProcessId processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, byte uniqueField) {
        return new SubscriptionInfo(11, 11, processId, null, AssignmentTestUtils.getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0, EMPTY_CLIENT_TAGS);
    }

    public static SubscriptionInfo getInfo(ProcessId processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, byte uniqueField, Map<String, String> clientTags) {
        return new SubscriptionInfo(11, 11, processId, null, AssignmentTestUtils.getTaskOffsetSums(prevTasks, standbyTasks), uniqueField, 0, clientTags);
    }

    private static Map<TaskId, Long> getTaskOffsetSums(Collection<TaskId> activeTasks, Collection<TaskId> standbyTasks) {
        Map<TaskId, Long> taskOffsetSums = activeTasks.stream().collect(Collectors.toMap(t -> t, t -> -2L));
        taskOffsetSums.putAll(standbyTasks.stream().collect(Collectors.toMap(t -> t, t -> 0L)));
        return taskOffsetSums;
    }

    public static ProcessId processIdForInt(int n) {
        return new ProcessId(new UUID(0L, n));
    }

    static void assertValidAssignment(int numStandbyReplicas, Set<TaskId> statefulTasks, Set<TaskId> statelessTasks, Map<ProcessId, ClientState> assignedStates, StringBuilder failureContext) {
        AssignmentTestUtils.assertValidAssignment(numStandbyReplicas, 0, statefulTasks, statelessTasks, assignedStates, failureContext);
    }

    static void assertValidAssignment(int numStandbyReplicas, int maxWarmupReplicas, Set<TaskId> statefulTasks, Set<TaskId> statelessTasks, Map<ProcessId, ClientState> assignedStates, StringBuilder failureContext) {
        TreeMap<TaskId, Set<ProcessId>> assignments = new TreeMap<TaskId, Set<ProcessId>>();
        for (TaskId taskId : statefulTasks) {
            assignments.put(taskId, new TreeSet());
        }
        for (TaskId taskId : statelessTasks) {
            assignments.put(taskId, new TreeSet());
        }
        for (Map.Entry entry2 : assignedStates.entrySet()) {
            AssignmentTestUtils.validateAndAddActiveAssignments(statefulTasks, statelessTasks, failureContext, assignments, entry2);
            AssignmentTestUtils.validateAndAddStandbyAssignments(statefulTasks, statelessTasks, failureContext, assignments, entry2);
        }
        AtomicInteger remainingWarmups = new AtomicInteger(maxWarmupReplicas);
        TreeMap treeMap = (TreeMap)assignments.entrySet().stream().filter(entry -> {
            boolean expectedActives = true;
            boolean isStateless = statelessTasks.contains(entry.getKey());
            int expectedStandbys = isStateless ? 0 : numStandbyReplicas;
            int expectedAssignments = Math.min(assignedStates.size(), 1 + expectedStandbys);
            int actualAssignments = ((Set)entry.getValue()).size();
            if (actualAssignments == expectedAssignments) {
                return false;
            }
            if (actualAssignments == expectedAssignments + 1 && remainingWarmups.get() > 0) {
                remainingWarmups.getAndDecrement();
                return false;
            }
            return true;
        }).collect(Utils.entriesToMap(TreeMap::new));
        if (!treeMap.isEmpty()) {
            MatcherAssert.assertThat((String)("Found some over- or under-assigned tasks in the final assignment with " + numStandbyReplicas + " and max warmups " + maxWarmupReplicas + " standby replicas, stateful tasks:" + statefulTasks + ", and stateless tasks:" + statelessTasks + failureContext), (Object)treeMap, (Matcher)Matchers.is(Collections.emptyMap()));
        }
    }

    private static void validateAndAddStandbyAssignments(Set<TaskId> statefulTasks, Set<TaskId> statelessTasks, StringBuilder failureContext, Map<TaskId, Set<ProcessId>> assignments, Map.Entry<ProcessId, ClientState> entry) {
        for (TaskId standbyTask : entry.getValue().standbyTasks()) {
            if (statelessTasks.contains(standbyTask)) {
                throw new AssertionError((Object)("Found a standby task for stateless task " + standbyTask + " on client " + entry + " stateless tasks:" + statelessTasks + failureContext));
            }
            if (assignments.containsKey(standbyTask)) {
                assignments.get(standbyTask).add(entry.getKey());
                continue;
            }
            throw new AssertionError((Object)("Found an extra standby task " + standbyTask + " on client " + entry + " but expected stateful tasks:" + statefulTasks + failureContext));
        }
    }

    private static void validateAndAddActiveAssignments(Set<TaskId> statefulTasks, Set<TaskId> statelessTasks, StringBuilder failureContext, Map<TaskId, Set<ProcessId>> assignments, Map.Entry<ProcessId, ClientState> entry) {
        for (TaskId activeTask : entry.getValue().activeTasks()) {
            if (assignments.containsKey(activeTask)) {
                assignments.get(activeTask).add(entry.getKey());
                continue;
            }
            throw new AssertionError((Object)("Found an extra active task " + activeTask + " on client " + entry + " but expected stateful tasks:" + statefulTasks + " and stateless tasks:" + statelessTasks + failureContext));
        }
    }

    static void assertBalancedStatefulAssignment(Set<TaskId> allStatefulTasks, Map<ProcessId, ClientState> clientStates, StringBuilder failureContext) {
        double maxStateful = Double.MIN_VALUE;
        double minStateful = Double.MAX_VALUE;
        for (ClientState clientState : clientStates.values()) {
            Set statefulTasks = Utils.intersection(HashSet::new, (Set)clientState.assignedTasks(), (Set[])new Set[]{allStatefulTasks});
            double statefulTaskLoad = 1.0 * (double)statefulTasks.size() / (double)clientState.capacity();
            maxStateful = Math.max(maxStateful, statefulTaskLoad);
            minStateful = Math.min(minStateful, statefulTaskLoad);
        }
        double statefulDiff = maxStateful - minStateful;
        if (statefulDiff > 1.0) {
            StringBuilder builder = new StringBuilder().append("detected a stateful assignment balance factor violation: ").append(statefulDiff).append(">").append(1.0).append(" in: ");
            AssignmentTestUtils.appendClientStates(builder, clientStates);
            Assert.fail((String)builder.append((CharSequence)failureContext).toString());
        }
    }

    static void assertBalancedActiveAssignment(Map<ProcessId, ClientState> clientStates, StringBuilder failureContext) {
        double maxActive = Double.MIN_VALUE;
        double minActive = Double.MAX_VALUE;
        for (ClientState clientState : clientStates.values()) {
            double activeTaskLoad = clientState.activeTaskLoad();
            maxActive = Math.max(maxActive, activeTaskLoad);
            minActive = Math.min(minActive, activeTaskLoad);
        }
        double activeDiff = maxActive - minActive;
        if (activeDiff > 1.0) {
            StringBuilder builder = new StringBuilder().append("detected an active assignment balance factor violation: ").append(activeDiff).append(">").append(1.0).append(" in: ");
            AssignmentTestUtils.appendClientStates(builder, clientStates);
            Assert.fail((String)builder.append((CharSequence)failureContext).toString());
        }
    }

    static void assertBalancedTasks(Map<ProcessId, ClientState> clientStates) {
        AssignmentTestUtils.assertBalancedTasks(clientStates, 1);
    }

    static void assertBalancedTasks(Map<ProcessId, ClientState> clientStates, int skewThreshold) {
        TaskSkewReport taskSkewReport = AssignmentTestUtils.analyzeTaskAssignmentBalance(clientStates, skewThreshold);
        if (taskSkewReport.totalSkewedTasks() > 0) {
            Assert.fail((String)("Expected a balanced task assignment, but was: " + taskSkewReport));
        }
    }

    static TaskSkewReport analyzeTaskAssignmentBalance(Map<ProcessId, ClientState> clientStates, int skewThreshold) {
        Function<Integer, Map> initialClientCounts = i -> clientStates.keySet().stream().collect(Collectors.toMap(c -> c, c -> new AtomicInteger(0)));
        TreeMap<Integer, Map> subtopologyToClientsWithPartition = new TreeMap<Integer, Map>();
        for (Map.Entry<ProcessId, ClientState> entry : clientStates.entrySet()) {
            ProcessId client = entry.getKey();
            ClientState clientState = entry.getValue();
            for (TaskId task : clientState.activeTasks()) {
                int subtopology = task.subtopology();
                ((AtomicInteger)subtopologyToClientsWithPartition.computeIfAbsent(subtopology, initialClientCounts).get(client)).incrementAndGet();
            }
        }
        int maxTaskSkew = 0;
        TreeSet skewedSubtopologies = new TreeSet();
        for (Map.Entry entry : subtopologyToClientsWithPartition.entrySet()) {
            Map clientsWithPartition = (Map)entry.getValue();
            int max = Integer.MIN_VALUE;
            int min = Integer.MAX_VALUE;
            for (AtomicInteger count : clientsWithPartition.values()) {
                max = Math.max(max, count.get());
                min = Math.min(min, count.get());
            }
            int taskSkew = max - min;
            maxTaskSkew = Math.max(maxTaskSkew, taskSkew);
            if (taskSkew <= skewThreshold) continue;
            skewedSubtopologies.add(entry.getKey());
        }
        return new TaskSkewReport(maxTaskSkew, skewedSubtopologies, subtopologyToClientsWithPartition);
    }

    static Matcher<ClientState> hasAssignedTasks(int taskCount) {
        return AssignmentTestUtils.hasProperty("assignedTasks", ClientState::assignedTaskCount, taskCount);
    }

    static Matcher<ClientState> hasActiveTasks(int taskCount) {
        return AssignmentTestUtils.hasProperty("activeTasks", ClientState::activeTaskCount, taskCount);
    }

    static Matcher<ClientState> hasStandbyTasks(int taskCount) {
        return AssignmentTestUtils.hasProperty("standbyTasks", ClientState::standbyTaskCount, taskCount);
    }

    static <V> Matcher<ClientState> hasProperty(final String propertyName, final Function<ClientState, V> propertyExtractor, final V propertyValue) {
        return new BaseMatcher<ClientState>(){

            public void describeTo(Description description) {
                description.appendText(propertyName).appendText(":").appendValue(propertyValue);
            }

            public boolean matches(Object actual) {
                if (actual instanceof ClientState) {
                    return Objects.equals(propertyExtractor.apply((ClientState)actual), propertyValue);
                }
                return false;
            }
        };
    }

    static void appendClientStates(StringBuilder stringBuilder, Map<ProcessId, ClientState> clientStates) {
        stringBuilder.append('{').append('\n');
        for (Map.Entry<ProcessId, ClientState> entry : clientStates.entrySet()) {
            stringBuilder.append("  ").append(entry.getKey()).append(": ").append(entry.getValue()).append('\n');
        }
        stringBuilder.append('}').append('\n');
    }

    static List<Node> getRandomNodes(int nodeSize) {
        ArrayList<Node> nodeList = new ArrayList<Node>(nodeSize);
        for (int i = 0; i < nodeSize; ++i) {
            nodeList.add(new Node(i, "node" + i, 1, RACK_PREFIX + i));
        }
        Random rand = AssignmentTestUtils.getRandom();
        Collections.shuffle(nodeList, rand);
        return nodeList;
    }

    static Node[] getRandomReplica(List<Node> nodeList, int index, int partition) {
        Node firstNode = nodeList.get(index * partition % nodeList.size());
        Node secondNode = nodeList.get((index * partition + 1) % nodeList.size());
        return new Node[]{firstNode, secondNode};
    }

    static Cluster getRandomCluster(int nodeSize, int tpSize, int partitionSize) {
        List<Node> nodeList = AssignmentTestUtils.getRandomNodes(nodeSize);
        HashSet<PartitionInfo> partitionInfoSet = new HashSet<PartitionInfo>();
        for (int i = 0; i < tpSize; ++i) {
            for (int j = 0; j < partitionSize; ++j) {
                Node[] replica = AssignmentTestUtils.getRandomReplica(nodeList, i, j);
                partitionInfoSet.add(new PartitionInfo(TOPIC_PREFIX + i, j, replica[0], replica, replica));
            }
        }
        return new Cluster("cluster", new HashSet<Node>(nodeList), partitionInfoSet, Collections.emptySet(), Collections.emptySet());
    }

    static Map<ProcessId, Map<String, Optional<String>>> getRandomProcessRacks(int clientSize, int nodeSize) {
        ArrayList<String> racks = new ArrayList<String>(nodeSize);
        for (int i = 0; i < nodeSize; ++i) {
            racks.add(RACK_PREFIX + i);
        }
        Random rand = AssignmentTestUtils.getRandom();
        Collections.shuffle(racks, rand);
        HashMap<ProcessId, Map<String, Optional<String>>> processRacks = new HashMap<ProcessId, Map<String, Optional<String>>>();
        for (int i = 1; i <= clientSize; ++i) {
            String rack = (String)racks.get(i % nodeSize);
            processRacks.put(AssignmentTestUtils.processIdForInt(i), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"1", Optional.of(rack))}));
        }
        return processRacks;
    }

    static SortedMap<TaskId, Set<TopicPartition>> getTaskTopicPartitionMap(int tpSize, int partitionSize, boolean changelog) {
        TreeMap<TaskId, Set<TopicPartition>> taskTopicPartitionMap = new TreeMap<TaskId, Set<TopicPartition>>();
        String topicName = changelog ? CHANGELOG_TOPIC_PREFIX : TOPIC_PREFIX;
        for (int i = 0; i < tpSize; ++i) {
            for (int j = 0; j < partitionSize; ++j) {
                taskTopicPartitionMap.put(new TaskId(i, j), Utils.mkSet((Object[])new TopicPartition[]{new TopicPartition(topicName + i, j), new TopicPartition(topicName + (i + 1) % tpSize, j)}));
            }
        }
        return taskTopicPartitionMap;
    }

    static Map<TopologyMetadata.Subtopology, Set<TaskId>> getTasksForTopicGroup(int tpSize, int partitionSize) {
        HashMap<TopologyMetadata.Subtopology, Set<TaskId>> tasksForTopicGroup = new HashMap<TopologyMetadata.Subtopology, Set<TaskId>>();
        for (int i = 0; i < tpSize; ++i) {
            for (int j = 0; j < partitionSize; ++j) {
                TopologyMetadata.Subtopology subtopology = new TopologyMetadata.Subtopology(i, null);
                tasksForTopicGroup.computeIfAbsent(subtopology, k -> new HashSet()).add(new TaskId(i, j));
            }
        }
        return tasksForTopicGroup;
    }

    static Map<String, Object> configProps(String rackAwareConfig) {
        return AssignmentTestUtils.configProps(rackAwareConfig, 0);
    }

    static Map<String, Object> configProps(String rackAwareConfig, int replicaNum) {
        HashMap<String, Object> configurationMap = new HashMap<String, Object>();
        configurationMap.put("application.id", APPLICATION_ID);
        configurationMap.put("bootstrap.servers", USER_END_POINT);
        configurationMap.put("num.standby.replicas", replicaNum);
        configurationMap.put("rack.aware.assignment.strategy", rackAwareConfig);
        ReferenceContainer referenceContainer = new ReferenceContainer();
        configurationMap.put("__reference.container.instance__", referenceContainer);
        return configurationMap;
    }

    static InternalTopicManager mockInternalTopicManagerForRandomChangelog(int nodeSize, int tpSize, int partitionSize) {
        MockTime time = new MockTime();
        StreamsConfig streamsConfig = new StreamsConfig(AssignmentTestUtils.configProps("min_traffic"));
        MockClientSupplier mockClientSupplier = new MockClientSupplier();
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager((Time)time, streamsConfig, mockClientSupplier.restoreConsumer, false);
        HashSet<String> changelogNames = new HashSet<String>();
        List<Node> nodeList = AssignmentTestUtils.getRandomNodes(nodeSize);
        HashMap<String, List> topicPartitionInfo = new HashMap<String, List>();
        for (int i = 0; i < tpSize; ++i) {
            String topicName = CHANGELOG_TOPIC_PREFIX + i;
            changelogNames.add(topicName);
            for (int j = 0; j < partitionSize; ++j) {
                Node[] replica = AssignmentTestUtils.getRandomReplica(nodeList, i, j);
                TopicPartitionInfo info = new TopicPartitionInfo(j, replica[0], Arrays.asList(replica), Arrays.asList(replica));
                topicPartitionInfo.computeIfAbsent(topicName, tp -> new ArrayList()).add(info);
            }
        }
        MockInternalTopicManager spyTopicManager = (MockInternalTopicManager)((Object)Mockito.spy((Object)((Object)mockInternalTopicManager)));
        ((MockInternalTopicManager)((Object)Mockito.doReturn(topicPartitionInfo).when((Object)spyTopicManager))).getTopicPartitionInfo(changelogNames);
        return spyTopicManager;
    }

    static SortedMap<ProcessId, ClientState> getRandomClientState(int clientSize, int tpSize, int partitionSize, int maxCapacity, Set<TaskId> statefulTasks) {
        return AssignmentTestUtils.getRandomClientState(clientSize, tpSize, partitionSize, maxCapacity, true, statefulTasks);
    }

    static List<Set<TaskId>> getRandomSubset(Set<TaskId> taskIds, int listSize) {
        Random rand = AssignmentTestUtils.getRandom();
        ArrayList<TaskId> taskIdList = new ArrayList<TaskId>(taskIds);
        Collections.shuffle(taskIdList, rand);
        int start = 0;
        ArrayList<Set<TaskId>> subSets = new ArrayList<Set<TaskId>>(listSize);
        for (int i = 0; i < listSize; ++i) {
            int remaining = taskIdList.size() - start;
            HashSet subset = new HashSet();
            if (remaining != 0) {
                int subSetSize = i == listSize - 1 ? remaining : rand.nextInt(remaining) + 1;
                for (int j = 0; j < subSetSize; ++j) {
                    subset.add(taskIdList.get(start + j));
                }
                start += subSetSize;
            }
            subSets.add(subset);
        }
        return subSets;
    }

    static SortedMap<ProcessId, ClientState> getRandomClientState(int clientSize, int tpSize, int partitionSize, int maxCapacity, boolean initialAssignment, Set<TaskId> statefulTasks) {
        TreeMap<ProcessId, ClientState> clientStates = new TreeMap<ProcessId, ClientState>();
        Map<TaskId, Long> taskLags = statefulTasks.stream().collect(Collectors.toMap(taskId -> taskId, taskId -> 0L));
        HashSet<TaskId> taskIds = new HashSet<TaskId>();
        for (int i = 0; i < tpSize; ++i) {
            for (int j = 0; j < partitionSize; ++j) {
                taskIds.add(new TaskId(i, j));
            }
        }
        Set missingTaskIds = taskLags.keySet().stream().filter(id -> !taskIds.contains(id)).collect(Collectors.toSet());
        if (!missingTaskIds.isEmpty()) {
            throw new IllegalArgumentException(missingTaskIds + " missing in all task ids " + taskIds);
        }
        List<Set<TaskId>> previousActives = AssignmentTestUtils.getRandomSubset(taskIds, clientSize);
        List<Set<TaskId>> previousStandbys = AssignmentTestUtils.getRandomSubset(statefulTasks, clientSize);
        Random rand = AssignmentTestUtils.getRandom();
        for (int i = 1; i <= clientSize; ++i) {
            int capacity = rand.nextInt(maxCapacity) + 1;
            ProcessId processId = AssignmentTestUtils.processIdForInt(i);
            ClientState clientState = new ClientState(previousActives.get(i - 1), previousStandbys.get(i - 1), taskLags, EMPTY_CLIENT_TAGS, capacity, processId);
            clientStates.put(processId, clientState);
        }
        if (initialAssignment) {
            Iterator iterator = clientStates.entrySet().iterator();
            ArrayList<TaskId> taskIdList = new ArrayList<TaskId>(taskIds);
            Collections.shuffle(taskIdList, rand);
            for (TaskId taskId2 : taskIdList) {
                if (!iterator.hasNext()) {
                    iterator = clientStates.entrySet().iterator();
                }
                ((ClientState)iterator.next().getValue()).assignActive(taskId2);
            }
        }
        return clientStates;
    }

    static Cluster getClusterForAllTopics() {
        return new Cluster("cluster", (Collection)Utils.mkSet((Object[])new Node[]{NODE_0, NODE_1, NODE_2, NODE_3, NODE_4}), (Collection)Utils.mkSet((Object[])new PartitionInfo[]{PI_0_0, PI_0_1, PI_0_2, PI_0_3, PI_0_4, PI_0_5, PI_0_6, PI_1_0, PI_1_1, PI_1_2, PI_1_3, PI_2_0, PI_2_1, PI_2_2, PI_2_3, PI_3_0, PI_3_1, PI_3_2}), Collections.emptySet(), Collections.emptySet());
    }

    static Map<TaskId, Set<TopicPartition>> getTaskTopicPartitionMapForAllTasks() {
        return Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)TASK_0_0, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_0_0})), Utils.mkEntry((Object)TASK_0_1, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_0_1})), Utils.mkEntry((Object)TASK_0_2, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_0_2})), Utils.mkEntry((Object)TASK_0_3, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_0_3})), Utils.mkEntry((Object)TASK_0_4, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_0_4})), Utils.mkEntry((Object)TASK_0_5, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_0_5})), Utils.mkEntry((Object)TASK_0_6, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_0_6})), Utils.mkEntry((Object)TASK_1_0, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_1_0})), Utils.mkEntry((Object)TASK_1_1, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_1_1})), Utils.mkEntry((Object)TASK_1_2, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_1_2})), Utils.mkEntry((Object)TASK_1_3, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_1_3})), Utils.mkEntry((Object)TASK_2_0, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_2_0})), Utils.mkEntry((Object)TASK_2_1, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_2_1})), Utils.mkEntry((Object)TASK_2_2, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_2_2})), Utils.mkEntry((Object)TASK_2_3, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_2_3})), Utils.mkEntry((Object)TASK_3_0, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_3_0})), Utils.mkEntry((Object)TASK_3_1, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_3_1})), Utils.mkEntry((Object)TASK_3_2, (Object)Utils.mkSet((Object[])new TopicPartition[]{TP_3_2}))});
    }

    static Map<TopologyMetadata.Subtopology, Set<TaskId>> getTasksForTopicGroup() {
        return Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)new TopologyMetadata.Subtopology(0, null), (Object)Utils.mkSet((Object[])new TaskId[]{TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3, TASK_0_4, TASK_0_5, TASK_0_6})), Utils.mkEntry((Object)new TopologyMetadata.Subtopology(1, null), (Object)Utils.mkSet((Object[])new TaskId[]{TASK_1_0, TASK_1_1, TASK_1_2, TASK_1_3})), Utils.mkEntry((Object)new TopologyMetadata.Subtopology(2, null), (Object)Utils.mkSet((Object[])new TaskId[]{TASK_2_0, TASK_2_1, TASK_2_2, TASK_2_3})), Utils.mkEntry((Object)new TopologyMetadata.Subtopology(3, null), (Object)Utils.mkSet((Object[])new TaskId[]{TASK_3_0, TASK_3_1, TASK_3_2}))});
    }

    static Map<TaskId, Set<TopicPartition>> getTaskChangelogMapForAllTasks() {
        return Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)TASK_0_0, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_0_0})), Utils.mkEntry((Object)TASK_0_1, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_0_1})), Utils.mkEntry((Object)TASK_0_2, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_0_2})), Utils.mkEntry((Object)TASK_0_3, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_0_3})), Utils.mkEntry((Object)TASK_0_4, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_0_4})), Utils.mkEntry((Object)TASK_0_5, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_0_5})), Utils.mkEntry((Object)TASK_0_6, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_0_6})), Utils.mkEntry((Object)TASK_1_0, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_1_0})), Utils.mkEntry((Object)TASK_1_1, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_1_1})), Utils.mkEntry((Object)TASK_1_2, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_1_2})), Utils.mkEntry((Object)TASK_1_3, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_1_3})), Utils.mkEntry((Object)TASK_2_0, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_2_0})), Utils.mkEntry((Object)TASK_2_1, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_2_1})), Utils.mkEntry((Object)TASK_2_2, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_2_2})), Utils.mkEntry((Object)TASK_2_3, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_2_3})), Utils.mkEntry((Object)TASK_3_0, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_3_0})), Utils.mkEntry((Object)TASK_3_1, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_3_1})), Utils.mkEntry((Object)TASK_3_2, (Object)Utils.mkSet((Object[])new TopicPartition[]{CHANGELOG_TP_3_2}))});
    }

    static InternalTopicManager mockInternalTopicManagerForChangelog() {
        MockTime time = new MockTime();
        StreamsConfig streamsConfig = new StreamsConfig(AssignmentTestUtils.configProps("min_traffic"));
        MockClientSupplier mockClientSupplier = new MockClientSupplier();
        MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager((Time)time, streamsConfig, mockClientSupplier.restoreConsumer, false);
        MockInternalTopicManager spyTopicManager = (MockInternalTopicManager)((Object)Mockito.spy((Object)((Object)mockInternalTopicManager)));
        ((MockInternalTopicManager)((Object)Mockito.doReturn((Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)CHANGELOG_TP_0_NAME, Arrays.asList(new TopicPartitionInfo(0, NODE_0, Arrays.asList(REPLICA_0), Collections.emptyList()), new TopicPartitionInfo(1, NODE_1, Arrays.asList(REPLICA_1), Collections.emptyList()), new TopicPartitionInfo(2, NODE_1, Arrays.asList(REPLICA_1), Collections.emptyList()), new TopicPartitionInfo(3, NODE_2, Arrays.asList(REPLICA_2), Collections.emptyList()), new TopicPartitionInfo(4, NODE_3, Arrays.asList(REPLICA_3), Collections.emptyList()), new TopicPartitionInfo(5, NODE_4, Arrays.asList(REPLICA_4), Collections.emptyList()), new TopicPartitionInfo(6, NODE_0, Arrays.asList(REPLICA_0), Collections.emptyList()))), Utils.mkEntry((Object)CHANGELOG_TP_1_NAME, Arrays.asList(new TopicPartitionInfo(0, NODE_2, Arrays.asList(REPLICA_2), Collections.emptyList()), new TopicPartitionInfo(1, NODE_3, Arrays.asList(REPLICA_3), Collections.emptyList()), new TopicPartitionInfo(2, NODE_0, Arrays.asList(REPLICA_0), Collections.emptyList()), new TopicPartitionInfo(3, NODE_4, Arrays.asList(REPLICA_4), Collections.emptyList()))), Utils.mkEntry((Object)CHANGELOG_TP_2_NAME, Arrays.asList(new TopicPartitionInfo(0, NODE_1, Arrays.asList(REPLICA_1), Collections.emptyList()), new TopicPartitionInfo(1, NODE_2, Arrays.asList(REPLICA_2), Collections.emptyList()), new TopicPartitionInfo(2, NODE_4, Arrays.asList(REPLICA_4), Collections.emptyList()), new TopicPartitionInfo(3, NODE_3, Arrays.asList(REPLICA_3), Collections.emptyList()))), Utils.mkEntry((Object)CHANGELOG_TP_3_NAME, Arrays.asList(new TopicPartitionInfo(0, NODE_4, Arrays.asList(REPLICA_4), Collections.emptyList()), new TopicPartitionInfo(1, NODE_3, Arrays.asList(REPLICA_3), Collections.emptyList()), new TopicPartitionInfo(2, NODE_1, Arrays.asList(REPLICA_1), Collections.emptyList())))})).when((Object)spyTopicManager))).getTopicPartitionInfo(ArgumentMatchers.anySet());
        return spyTopicManager;
    }

    static Map<TopologyMetadata.Subtopology, Set<TaskId>> getTopologyGroupTaskMap() {
        return Collections.singletonMap(SUBTOPOLOGY_0, Collections.singleton(new TaskId(1, 1)));
    }

    static void verifyStandbySatisfyRackReplica(Set<TaskId> taskIds, Map<ProcessId, String> racksForProcess, Map<ProcessId, ClientState> clientStateMap, Integer replica, boolean relaxRackCheck, Map<ProcessId, Integer> standbyTaskCount) {
        if (standbyTaskCount != null) {
            for (Map.Entry entry : clientStateMap.entrySet()) {
                int expected = standbyTaskCount.get(entry.getKey());
                int actual = ((ClientState)entry.getValue()).standbyTaskCount();
                Assert.assertEquals((String)("StandbyTaskCount for " + entry.getKey() + " doesn't match"), (long)expected, (long)actual);
            }
        }
        for (TaskId taskId : taskIds) {
            int activeCount = 0;
            int standbyCount = 0;
            HashMap<String, ProcessId> racks = new HashMap<String, ProcessId>();
            for (Map.Entry<ProcessId, ClientState> entry : clientStateMap.entrySet()) {
                ProcessId processId = entry.getKey();
                ClientState clientState = entry.getValue();
                if (!relaxRackCheck && clientState.hasAssignedTask(taskId)) {
                    String rack = racksForProcess.get(processId);
                    MatcherAssert.assertThat((String)("Task " + taskId + " appears in both " + processId + " and " + racks.get(rack)), racks.keySet(), (Matcher)Matchers.not((Matcher)Matchers.hasItems((Object[])new String[]{rack})));
                    racks.put(rack, processId);
                }
                boolean hasActive = false;
                if (clientState.hasActiveTask(taskId)) {
                    ++activeCount;
                    hasActive = true;
                }
                boolean hasStandby = false;
                if (clientState.hasStandbyTask(taskId)) {
                    ++standbyCount;
                    hasStandby = true;
                }
                Assert.assertFalse((String)(clientState + " has both active and standby task " + taskId), (hasActive && hasStandby ? 1 : 0) != 0);
            }
            Assert.assertEquals((String)("Task " + taskId + " should have 1 active task"), (long)1L, (long)activeCount);
            if (replica == null) continue;
            Assert.assertEquals((String)("Task " + taskId + " has wrong replica count"), (long)replica.intValue(), (long)standbyCount);
        }
    }

    static Map<ProcessId, Integer> clientTaskCount(Map<ProcessId, ClientState> clientStateMap, Function<ClientState, Integer> taskFunc) {
        return clientStateMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, v -> (Integer)taskFunc.apply((ClientState)v.getValue())));
    }

    static Map<ProcessId, Map<String, Optional<String>>> getProcessRacksForAllProcess() {
        return Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)PID_1, (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"1", Optional.of(RACK_0))})), Utils.mkEntry((Object)PID_2, (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"1", Optional.of(RACK_1))})), Utils.mkEntry((Object)PID_3, (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"1", Optional.of(RACK_2))})), Utils.mkEntry((Object)PID_4, (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"1", Optional.of(RACK_3))})), Utils.mkEntry((Object)PID_5, (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"1", Optional.of(RACK_4))})), Utils.mkEntry((Object)PID_6, (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"1", Optional.of(RACK_0))})), Utils.mkEntry((Object)PID_7, (Object)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"1", Optional.of(RACK_1))}))});
    }

    static RackAwareTaskAssignor getRackAwareTaskAssignor(AssignmentConfigs configs) {
        return AssignmentTestUtils.getRackAwareTaskAssignor(configs, Utils.mkMap((Map.Entry[])new Map.Entry[0]));
    }

    static RackAwareTaskAssignor getRackAwareTaskAssignor(AssignmentConfigs configs, Map<TopologyMetadata.Subtopology, Set<TaskId>> taskForTopicGroup) {
        return (RackAwareTaskAssignor)Mockito.spy((Object)new RackAwareTaskAssignor(AssignmentTestUtils.getClusterForAllTopics(), AssignmentTestUtils.getTaskTopicPartitionMapForAllTasks(), AssignmentTestUtils.getTaskChangelogMapForAllTasks(), taskForTopicGroup, AssignmentTestUtils.getProcessRacksForAllProcess(), AssignmentTestUtils.mockInternalTopicManagerForChangelog(), configs, (Time)new MockTime()));
    }

    static void verifyTaskPlacementWithRackAwareAssignor(RackAwareTaskAssignor rackAwareTaskAssignor, Set<TaskId> allTaskIds, Map<ProcessId, ClientState> clientStates, boolean hasStandby, boolean enableRackAwareTaskAssignor) {
        AssignmentTestUtils.verifyStandbySatisfyRackReplica(allTaskIds, rackAwareTaskAssignor.racksForProcess(), clientStates, null, true, null);
        if (enableRackAwareTaskAssignor) {
            ((RackAwareTaskAssignor)Mockito.verify((Object)rackAwareTaskAssignor, (VerificationMode)Mockito.times((int)2))).optimizeActiveTasks((SortedSet)ArgumentMatchers.any(), (SortedMap)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
            ((RackAwareTaskAssignor)Mockito.verify((Object)rackAwareTaskAssignor, (VerificationMode)(hasStandby ? Mockito.times((int)1) : Mockito.never()))).optimizeStandbyTasks((SortedMap)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (RackAwareTaskAssignor.MoveStandbyTaskPredicate)ArgumentMatchers.any());
        } else {
            ((RackAwareTaskAssignor)Mockito.verify((Object)rackAwareTaskAssignor, (VerificationMode)Mockito.never())).optimizeActiveTasks((SortedSet)ArgumentMatchers.any(), (SortedMap)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt());
            ((RackAwareTaskAssignor)Mockito.verify((Object)rackAwareTaskAssignor, (VerificationMode)Mockito.never())).optimizeStandbyTasks((SortedMap)ArgumentMatchers.any(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (RackAwareTaskAssignor.MoveStandbyTaskPredicate)ArgumentMatchers.any());
        }
    }

    static SortedMap<ProcessId, ClientState> copyClientStateMap(Map<ProcessId, ClientState> originalMap) {
        return new TreeMap<ProcessId, ClientState>(originalMap.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> new ClientState((ClientState)entry.getValue()))));
    }

    static synchronized Random getRandom() {
        if (random == null) {
            long seed = System.currentTimeMillis();
            System.out.println("seed for getRandom: " + seed);
            random = new Random(seed);
        }
        return random;
    }

    static final class TaskSkewReport {
        private final int maxTaskSkew;
        private final Set<Integer> skewedSubtopologies;
        private final Map<Integer, Map<ProcessId, AtomicInteger>> subtopologyToClientsWithPartition;

        private TaskSkewReport(int maxTaskSkew, Set<Integer> skewedSubtopologies, Map<Integer, Map<ProcessId, AtomicInteger>> subtopologyToClientsWithPartition) {
            this.maxTaskSkew = maxTaskSkew;
            this.skewedSubtopologies = skewedSubtopologies;
            this.subtopologyToClientsWithPartition = subtopologyToClientsWithPartition;
        }

        int totalSkewedTasks() {
            return this.skewedSubtopologies.size();
        }

        Set<Integer> skewedSubtopologies() {
            return this.skewedSubtopologies;
        }

        public String toString() {
            return "TaskSkewReport{maxTaskSkew=" + this.maxTaskSkew + ", skewedSubtopologies=" + this.skewedSubtopologies + ", subtopologyToClientsWithPartition=" + this.subtopologyToClientsWithPartition + '}';
        }
    }
}

