package com.azure.messaging.eventhubs;

import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LogLevel;
import com.azure.core.util.logging.LoggingEventBuilder;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;

/* loaded from: input_file:com/azure/messaging/eventhubs/PartitionBasedLoadBalancer.class */
final class PartitionBasedLoadBalancer {
    private static final ClientLogger LOGGER = new ClientLogger(PartitionBasedLoadBalancer.class);
    private final String eventHubName;
    private final String consumerGroupName;
    private final CheckpointStore checkpointStore;
    private final EventHubAsyncClient eventHubAsyncClient;
    private final String ownerId;
    private final long inactiveTimeLimitInMillis;
    private final PartitionPumpManager partitionPumpManager;
    private final String fullyQualifiedNamespace;
    private final Consumer<ErrorContext> processError;
    private final PartitionContext partitionAgnosticContext;
    private final LoadBalancingStrategy loadBalancingStrategy;
    private final AtomicBoolean isLoadBalancerRunning = new AtomicBoolean();
    private final AtomicBoolean morePartitionsToClaim = new AtomicBoolean();
    private final AtomicReference<List<String>> partitionsCache = new AtomicReference<>(new ArrayList());

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionBasedLoadBalancer(CheckpointStore checkpointStore, EventHubAsyncClient eventHubAsyncClient, String str, String str2, String str3, String str4, long j, PartitionPumpManager partitionPumpManager, Consumer<ErrorContext> consumer, LoadBalancingStrategy loadBalancingStrategy) {
        this.checkpointStore = checkpointStore;
        this.eventHubAsyncClient = eventHubAsyncClient;
        this.fullyQualifiedNamespace = str;
        this.eventHubName = str2;
        this.consumerGroupName = str3;
        this.ownerId = str4;
        this.inactiveTimeLimitInMillis = TimeUnit.SECONDS.toMillis(j);
        this.partitionPumpManager = partitionPumpManager;
        this.processError = consumer;
        this.partitionAgnosticContext = new PartitionContext(str, str2, str3, "NONE");
        this.loadBalancingStrategy = loadBalancingStrategy;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void loadBalance() {
        Mono just;
        if (!this.isLoadBalancerRunning.compareAndSet(false, true)) {
            LOGGER.atInfo().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).log("Load balancer already running.");
            return;
        }
        LOGGER.atInfo().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).log("Starting load balancer.");
        Mono collectMap = this.checkpointStore.listOwnership(this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroupName).timeout(Duration.ofMinutes(1L)).collectMap((v0) -> {
            return v0.getPartitionId();
        }, Function.identity());
        if (CoreUtils.isNullOrEmpty(this.partitionsCache.get())) {
            LOGGER.atInfo().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.eventHubName).addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).log("Getting partitions from Event Hubs service.");
            just = this.eventHubAsyncClient.getPartitionIds().timeout(Duration.ofMinutes(1L)).collectList();
        } else {
            just = Mono.just(this.partitionsCache.get());
            closeClient();
        }
        Mono.zip(collectMap, just).flatMap(this::loadBalance).then().repeat(() -> {
            return LoadBalancingStrategy.GREEDY == this.loadBalancingStrategy && this.morePartitionsToClaim.get();
        }).subscribe(r1 -> {
        }, th -> {
            LOGGER.atWarning().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).log(Messages.LOAD_BALANCING_FAILED, new Object[]{th});
            this.processError.accept(new ErrorContext(this.partitionAgnosticContext, th));
            this.isLoadBalancerRunning.set(false);
            this.morePartitionsToClaim.set(false);
        }, () -> {
            LOGGER.atInfo().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).log("Load balancing completed successfully.");
        });
    }

    private Mono<Void> loadBalance(Tuple2<Map<String, PartitionOwnership>, List<String>> tuple2) {
        return Mono.fromRunnable(() -> {
            LOGGER.atInfo().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).log("Starting next iteration of load balancer.");
            Map<String, PartitionOwnership> map = (Map) tuple2.getT1();
            List<String> list = (List) tuple2.getT2();
            if (CoreUtils.isNullOrEmpty(list)) {
                throw LOGGER.logExceptionAsError(Exceptions.propagate(new IllegalStateException("There are no partitions in Event Hub " + this.eventHubName)));
            }
            this.partitionsCache.set(list);
            int size = list.size();
            LOGGER.atInfo().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).addKeyValue("numberOfPartitions", size).addKeyValue("ownershipRecords", map.size()).log("Load balancing.");
            if (!isValid(map)) {
                throw LOGGER.logExceptionAsError(Exceptions.propagate(new IllegalStateException("Invalid partitionOwnership data from CheckpointStore")));
            }
            Map<String, PartitionOwnership> removeInactivePartitionOwnerships = removeInactivePartitionOwnerships(map);
            LOGGER.atInfo().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).addKeyValue("activeRecords", removeInactivePartitionOwnerships.size()).log("Found active ownership records.");
            Map<String, List<PartitionOwnership>> map2 = (Map) removeInactivePartitionOwnerships.values().stream().collect(Collectors.groupingBy((v0) -> {
                return v0.getOwnerId();
            }, Collectors.mapping(Function.identity(), Collectors.toList())));
            map2.putIfAbsent(this.ownerId, new ArrayList());
            logPartitionDistribution(map2);
            if (CoreUtils.isNullOrEmpty(removeInactivePartitionOwnerships)) {
                claimOwnership(map, list.get(ThreadLocalRandom.current().nextInt(size)));
                return;
            }
            int size2 = map2.size();
            LOGGER.atInfo().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).addKeyValue("numberOfProcessors", map2.size()).log("Found active event processors.");
            int i = size / size2;
            int i2 = size % size2;
            LOGGER.atInfo().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).addKeyValue("minPartitionsPerEventProcessor", i).addKeyValue("eventProcessorsWithAdditionalPartition", i2).log("Calculated number of event processors that can own additional partition.");
            if (isLoadBalanced(i, i2, map2)) {
                LOGGER.atInfo().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).addKeyValue("partitionCount", map2.get(this.ownerId).size()).log("Load is balanced for this event processor.");
                renewOwnership(map);
            } else if (shouldOwnMorePartitions(i, map2)) {
                LOGGER.atInfo().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).addKeyValue("partitionCount", map2.get(this.ownerId).size()).log("Load is unbalanced and this event processor should own more partitions");
                claimOwnership(map, list.parallelStream().filter(str -> {
                    return !removeInactivePartitionOwnerships.containsKey(str);
                }).findAny().orElseGet(() -> {
                    LOGGER.atInfo().addKeyValue("partitionCount", 0L).addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).log("No unclaimed partitions, stealing from another event processor");
                    return findPartitionToSteal(map2);
                }));
            } else {
                LOGGER.atInfo().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).addKeyValue("partitionCount", map2.get(this.ownerId).size()).log("This event processor shouldn't own more partitions");
                renewOwnership(map);
            }
        });
    }

    private void closeClient() {
        try {
            this.eventHubAsyncClient.close();
        } catch (Exception e) {
            LOGGER.atWarning().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).log("Failed to close the client", new Object[]{e});
        }
    }

    private void renewOwnership(Map<String, PartitionOwnership> map) {
        this.morePartitionsToClaim.set(false);
        Flux<PartitionOwnership> claimOwnership = this.checkpointStore.claimOwnership((List) this.partitionPumpManager.getPartitionPumps().keySet().stream().filter(str -> {
            return map.containsKey(str) && ((PartitionOwnership) map.get(str)).getOwnerId().equals(this.ownerId);
        }).map(str2 -> {
            return createPartitionOwnershipRequest(map, str2);
        }).collect(Collectors.toList()));
        PartitionPumpManager partitionPumpManager = this.partitionPumpManager;
        Objects.requireNonNull(partitionPumpManager);
        claimOwnership.subscribe(partitionPumpManager::verifyPartitionConnection, th -> {
            LOGGER.atError().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).log("Error renewing partition ownership", new Object[]{th});
            this.isLoadBalancerRunning.set(false);
        }, () -> {
            this.isLoadBalancerRunning.set(false);
        });
    }

    private boolean isValid(Map<String, PartitionOwnership> map) {
        return map.values().stream().noneMatch(partitionOwnership -> {
            return partitionOwnership.getEventHubName() == null || !partitionOwnership.getEventHubName().equals(this.eventHubName) || partitionOwnership.getConsumerGroup() == null || !partitionOwnership.getConsumerGroup().equals(this.consumerGroupName) || partitionOwnership.getPartitionId() == null || partitionOwnership.getLastModifiedTime() == null || partitionOwnership.getETag() == null;
        });
    }

    private String findPartitionToSteal(Map<String, List<PartitionOwnership>> map) {
        Map.Entry<String, List<PartitionOwnership>> entry = map.entrySet().stream().max(Comparator.comparingInt(entry2 -> {
            return ((List) entry2.getValue()).size();
        })).get();
        int size = entry.getValue().size();
        LOGGER.atInfo().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).addKeyValue("ownerWithMaxPartitions", entry.getKey()).log("Stealing a partition from owner that owns max number of partitions.");
        return entry.getValue().get(ThreadLocalRandom.current().nextInt(size)).getPartitionId();
    }

    private boolean isLoadBalanced(int i, int i2, Map<String, List<PartitionOwnership>> map) {
        int i3 = 0;
        Iterator<List<PartitionOwnership>> it = map.values().iterator();
        while (it.hasNext()) {
            int size = it.next().size();
            if (size < i || size > i + 1) {
                return false;
            }
            if (size == i + 1) {
                i3++;
            }
        }
        return i3 == i2;
    }

    private boolean shouldOwnMorePartitions(int i, Map<String, List<PartitionOwnership>> map) {
        int size = map.get(this.ownerId).size();
        return size < i || size == map.values().stream().min(Comparator.comparingInt((v0) -> {
            return v0.size();
        })).get().size();
    }

    private Map<String, PartitionOwnership> removeInactivePartitionOwnerships(Map<String, PartitionOwnership> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            long currentTimeMillis = (System.currentTimeMillis() - ((PartitionOwnership) entry.getValue()).getLastModifiedTime().longValue()) / 1000;
            LOGGER.atLevel(currentTimeMillis < this.inactiveTimeLimitInMillis ? LogLevel.VERBOSE : LogLevel.INFORMATIONAL).addKeyValue(ClientConstants.PARTITION_ID_KEY, (String) entry.getKey()).addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).addKeyValue("partitionOwnerId", ((PartitionOwnership) entry.getValue()).getOwnerId()).addKeyValue("modifiedSecondsAgo", currentTimeMillis).log("Detecting inactive ownerships.");
            return System.currentTimeMillis() - ((PartitionOwnership) entry.getValue()).getLastModifiedTime().longValue() < this.inactiveTimeLimitInMillis && !CoreUtils.isNullOrEmpty(((PartitionOwnership) entry.getValue()).getOwnerId());
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private void claimOwnership(Map<String, PartitionOwnership> map, String str) {
        LOGGER.atInfo().addKeyValue(ClientConstants.PARTITION_ID_KEY, str).addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).log("Attempting to claim ownership of partition.");
        PartitionOwnership createPartitionOwnershipRequest = createPartitionOwnershipRequest(map, str);
        ArrayList arrayList = new ArrayList();
        arrayList.add(createPartitionOwnershipRequest);
        arrayList.addAll((Collection) this.partitionPumpManager.getPartitionPumps().keySet().stream().filter(str2 -> {
            return map.containsKey(str2) && ((PartitionOwnership) map.get(str2)).getOwnerId().equals(this.ownerId);
        }).map(str3 -> {
            return createPartitionOwnershipRequest(map, str3);
        }).collect(Collectors.toList()));
        this.morePartitionsToClaim.set(true);
        this.checkpointStore.claimOwnership(arrayList).doOnNext(partitionOwnership -> {
            LOGGER.atInfo().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionOwnership.getPartitionId()).addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).log("Successfully claimed ownership.");
        }).doOnError(th -> {
            LOGGER.atWarning().addKeyValue(ClientConstants.PARTITION_ID_KEY, createPartitionOwnershipRequest.getPartitionId()).log(Messages.FAILED_TO_CLAIM_OWNERSHIP, new Object[]{th});
        }).collectList().zipWhen(list -> {
            return this.checkpointStore.listCheckpoints(this.fullyQualifiedNamespace, this.eventHubName, this.consumerGroupName).collectMap(checkpoint -> {
                return checkpoint.getPartitionId();
            }, Function.identity());
        }).subscribe(tuple2 -> {
            ((List) tuple2.getT1()).stream().forEach(partitionOwnership2 -> {
                this.partitionPumpManager.startPartitionPump(partitionOwnership2, (Checkpoint) ((Map) tuple2.getT2()).get(partitionOwnership2.getPartitionId()));
            });
        }, th2 -> {
            this.processError.accept(new ErrorContext(this.partitionAgnosticContext, th2));
            if (this.loadBalancingStrategy == LoadBalancingStrategy.BALANCED) {
                this.isLoadBalancerRunning.set(false);
            }
            throw LOGGER.atError().addKeyValue(ClientConstants.PARTITION_ID_KEY, str).addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId).log(new IllegalStateException("Error while claiming ownership", th2));
        }, () -> {
            if (this.loadBalancingStrategy == LoadBalancingStrategy.BALANCED) {
                this.isLoadBalancerRunning.set(false);
            }
        });
    }

    private void logPartitionDistribution(Map<String, List<PartitionOwnership>> map) {
        if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
            LoggingEventBuilder addKeyValue = LOGGER.atVerbose().addKeyValue(ClientConstants.OWNER_ID_KEY, this.ownerId);
            for (Map.Entry<String, List<PartitionOwnership>> entry : map.entrySet()) {
                addKeyValue.addKeyValue(entry.getKey(), (String) entry.getValue().stream().map(partitionOwnership -> {
                    return partitionOwnership.getPartitionId();
                }).collect(Collectors.joining(",")));
            }
            addKeyValue.log("Current partition distribution.");
        }
    }

    private PartitionOwnership createPartitionOwnershipRequest(Map<String, PartitionOwnership> map, String str) {
        PartitionOwnership partitionOwnership = map.get(str);
        return new PartitionOwnership().setFullyQualifiedNamespace(this.fullyQualifiedNamespace).setOwnerId(this.ownerId).setPartitionId(str).setConsumerGroup(this.consumerGroupName).setEventHubName(this.eventHubName).setETag(partitionOwnership == null ? null : partitionOwnership.getETag());
    }
}
