package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.implementation.PartitionKeyRange;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.LeaseContainer;
import com.azure.cosmos.implementation.changefeed.LeaseManager;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedStateV1;
import com.azure.cosmos.implementation.changefeed.epkversion.feedRangeGoneHandler.FeedRangeGoneHandler;
import com.azure.cosmos.implementation.changefeed.epkversion.feedRangeGoneHandler.FeedRangeGoneMergeHandler;
import com.azure.cosmos.implementation.changefeed.epkversion.feedRangeGoneHandler.FeedRangeGoneSplitHandler;
import com.azure.cosmos.implementation.feedranges.FeedRangeContinuation;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper;
import com.azure.cosmos.implementation.routing.Range;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/cosmos/implementation/changefeed/epkversion/PartitionSynchronizerImpl.class */
public class PartitionSynchronizerImpl implements PartitionSynchronizer {
    private static final Comparator<Range<String>> MIN_RANGE_COMPARATOR = new Range.MinComparator();
    private static final Comparator<Range<String>> MAX_RANGE_COMPARATOR = new Range.MaxComparator();
    private final Logger logger = LoggerFactory.getLogger(PartitionSynchronizerImpl.class);
    private final ChangeFeedContextClient documentClient;
    private final String collectionSelfLink;
    private final LeaseContainer leaseContainer;
    private final LeaseManager leaseManager;
    private final int degreeOfParallelism;
    private final int maxBatchSize;
    private final ChangeFeedProcessorOptions changeFeedProcessorOptions;
    private final ChangeFeedMode changeFeedMode;

    public PartitionSynchronizerImpl(ChangeFeedContextClient changeFeedContextClient, String str, LeaseContainer leaseContainer, LeaseManager leaseManager, int i, int i2, ChangeFeedProcessorOptions changeFeedProcessorOptions, ChangeFeedMode changeFeedMode) {
        this.documentClient = changeFeedContextClient;
        this.collectionSelfLink = str;
        this.leaseContainer = leaseContainer;
        this.leaseManager = leaseManager;
        this.degreeOfParallelism = i;
        this.maxBatchSize = i2;
        this.changeFeedProcessorOptions = changeFeedProcessorOptions;
        this.changeFeedMode = changeFeedMode;
    }

    @Override // com.azure.cosmos.implementation.changefeed.epkversion.PartitionSynchronizer
    public Mono<Void> createMissingLeases() {
        return this.documentClient.getOverlappingRanges(PartitionKeyInternalHelper.FullRange, true).flatMap(list -> {
            return createLeases(list).then();
        }).onErrorResume(th -> {
            this.logger.error("Create lease failed", th);
            return Mono.empty();
        });
    }

    @Override // com.azure.cosmos.implementation.changefeed.epkversion.PartitionSynchronizer
    public Mono<Void> createMissingLeases(List<Lease> list) {
        return this.documentClient.getOverlappingRanges(PartitionKeyInternalHelper.FullRange, true).flatMap(list2 -> {
            return createLeases(list2, list).then();
        }).doOnError(th -> {
            this.logger.error("Create missing leases from pkRangeIdVersion leases failed", th);
        });
    }

    @Override // com.azure.cosmos.implementation.changefeed.epkversion.PartitionSynchronizer
    public Mono<FeedRangeGoneHandler> getFeedRangeGoneHandler(Lease lease) {
        Preconditions.checkNotNull(lease, "Argument 'lease' can not be null");
        String leaseToken = lease.getLeaseToken();
        this.logger.info("Lease with token {} is gone due to split or merge; will attempt to resume using continuation token {}.", leaseToken, lease.getContinuationToken());
        return this.documentClient.getOverlappingRanges(((FeedRangeEpkImpl) lease.getFeedRange()).getRange(), true).flatMap(list -> {
            if (list.size() != 0) {
                return list.size() > 1 ? Mono.just(new FeedRangeGoneSplitHandler(lease, list, this.leaseManager, this.changeFeedProcessorOptions.getMaxScaleCount())) : Mono.just(new FeedRangeGoneMergeHandler(lease, (PartitionKeyRange) list.get(0)));
            }
            this.logger.error("Lease with token {} is gone but we failed to find at least one child range", leaseToken);
            return Mono.error(new RuntimeException(String.format("Lease %s is gone but we failed to find at least one child partition", leaseToken)));
        });
    }

    private Flux<Lease> createLeases(List<PartitionKeyRange> list) {
        return this.leaseContainer.getAllLeases().collectList().flatMapMany(list2 -> {
            return Flux.fromIterable(list).flatMap(partitionKeyRange -> {
                return !list2.stream().anyMatch(lease -> {
                    Range<String> range = ((FeedRangeEpkImpl) lease.getFeedRange()).getRange();
                    return range.getMin().equals(partitionKeyRange.getMinInclusive()) || range.getMax().equals(partitionKeyRange.getMaxExclusive());
                }) ? Mono.just(partitionKeyRange) : Mono.empty();
            }).flatMap(partitionKeyRange2 -> {
                return this.leaseManager.createLeaseIfNotExist(new FeedRangeEpkImpl(partitionKeyRange2.toRange()), (String) null);
            }, this.degreeOfParallelism);
        });
    }

    private Flux<Lease> createLeases(List<PartitionKeyRange> list, List<Lease> list2) {
        return this.leaseContainer.getAllLeases().collectList().flatMapMany(list3 -> {
            Map map = (Map) list2.stream().collect(Collectors.toMap(lease -> {
                return lease.getLeaseToken();
            }, lease2 -> {
                return lease2;
            }));
            return Flux.fromIterable(list).flatMap(partitionKeyRange -> {
                if (list3.stream().anyMatch(lease3 -> {
                    Range<String> range = ((FeedRangeEpkImpl) lease3.getFeedRange()).getRange();
                    return MIN_RANGE_COMPARATOR.compare(range, partitionKeyRange.toRange()) <= 0 && MAX_RANGE_COMPARATOR.compare(range, partitionKeyRange.toRange()) >= 0;
                })) {
                    return Mono.empty();
                }
                ArrayList<String> arrayList = new ArrayList();
                arrayList.add(partitionKeyRange.getId());
                if (partitionKeyRange.getParents() != null) {
                    arrayList.addAll(partitionKeyRange.getParents());
                }
                ArrayList arrayList2 = new ArrayList();
                for (String str : arrayList) {
                    if (map.containsKey(str)) {
                        arrayList2.add((Lease) map.get(str));
                    }
                }
                return arrayList2.size() == 0 ? Mono.error(new IllegalStateException("Can not find pkRangeId version lease")) : arrayList2.size() == 1 ? Mono.just(Pair.of(partitionKeyRange, (Lease) arrayList2.get(0))) : Mono.error(new IllegalStateException("A merge has happened, creating epk version lease from pkRangeId version lease is not supported"));
            }).flatMap(pair -> {
                FeedRangeEpkImpl feedRangeEpkImpl = new FeedRangeEpkImpl(((PartitionKeyRange) pair.getLeft()).toRange());
                return this.leaseManager.createLeaseIfNotExist(feedRangeEpkImpl, getLeaseContinuationToken(feedRangeEpkImpl, ((Lease) pair.getRight()).getContinuationToken()));
            }, this.degreeOfParallelism);
        });
    }

    private String getLeaseContinuationToken(FeedRangeEpkImpl feedRangeEpkImpl, String str) {
        FeedRangeContinuation create = FeedRangeContinuation.create(this.collectionSelfLink, feedRangeEpkImpl, feedRangeEpkImpl.getRange());
        create.replaceContinuation(str);
        return new ChangeFeedStateV1(this.collectionSelfLink, feedRangeEpkImpl, this.changeFeedMode, PartitionProcessorHelper.getStartFromSettings(feedRangeEpkImpl, this.changeFeedProcessorOptions, this.changeFeedMode), create).toString();
    }
}
