package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.InternalObjectNode;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.Lease;
import com.azure.cosmos.implementation.changefeed.ServiceItemLease;
import com.azure.cosmos.implementation.changefeed.ServiceItemLeaseUpdater;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseConflictException;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.PartitionKey;
import java.time.Instant;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.util.retry.Retry;

/* loaded from: input_file:com/azure/cosmos/implementation/changefeed/implementation/DocumentServiceLeaseUpdaterImpl.class */
class DocumentServiceLeaseUpdaterImpl implements ServiceItemLeaseUpdater {
    private final Logger logger = LoggerFactory.getLogger(DocumentServiceLeaseUpdaterImpl.class);
    private final int RETRY_COUNT_ON_CONFLICT = 5;
    private final ChangeFeedContextClient client;

    public DocumentServiceLeaseUpdaterImpl(ChangeFeedContextClient changeFeedContextClient) {
        if (changeFeedContextClient == null) {
            throw new IllegalArgumentException("client");
        }
        this.client = changeFeedContextClient;
    }

    @Override // com.azure.cosmos.implementation.changefeed.ServiceItemLeaseUpdater
    public Mono<Lease> updateLease(Lease lease, String str, PartitionKey partitionKey, CosmosItemRequestOptions cosmosItemRequestOptions, Function<Lease, Lease> function) {
        Lease apply = function.apply(lease);
        if (apply == null) {
            return Mono.empty();
        }
        apply.setTimestamp(Instant.now());
        lease.setServiceItemLease(apply);
        return Mono.just(this).flatMap(documentServiceLeaseUpdaterImpl -> {
            return tryReplaceLease(lease, str, partitionKey);
        }).map(internalObjectNode -> {
            lease.setServiceItemLease(ServiceItemLease.fromDocument(internalObjectNode));
            return lease;
        }).hasElement().flatMap(bool -> {
            return bool.booleanValue() ? Mono.just(lease) : this.client.readItem(str, partitionKey, cosmosItemRequestOptions, InternalObjectNode.class).onErrorResume(th -> {
                if ((th instanceof CosmosException) && ((CosmosException) th).getStatusCode() == 404) {
                    throw new LeaseLostException(lease);
                }
                return Mono.error(th);
            }).map(cosmosItemResponse -> {
                ServiceItemLease fromDocument = ServiceItemLease.fromDocument(BridgeInternal.getProperties(cosmosItemResponse));
                this.logger.info("Partition {} update failed because the lease with token '{}' was updated by owner '{}' with token '{}'.", new Object[]{lease.getLeaseToken(), lease.getConcurrencyToken(), fromDocument.getOwner(), fromDocument.getConcurrencyToken()});
                lease.setConcurrencyToken(fromDocument.getConcurrencyToken());
                lease.setOwner(fromDocument.getOwner());
                throw new LeaseConflictException(lease, "Partition update failed");
            });
        }).retryWhen(Retry.max(5L).filter(th -> {
            if (!(th instanceof LeaseConflictException)) {
                return false;
            }
            this.logger.info("Partition {} for the lease with token '{}' failed to update for owner '{}'; will retry.", new Object[]{lease.getLeaseToken(), lease.getConcurrencyToken(), lease.getOwner()});
            return true;
        })).onErrorResume(th2 -> {
            if (!(th2 instanceof LeaseConflictException)) {
                return Mono.error(th2);
            }
            this.logger.warn("Partition {} for the lease with token '{}' failed to update for owner '{}'; current continuation token '{}'.", new Object[]{lease.getLeaseToken(), lease.getConcurrencyToken(), lease.getOwner(), lease.getContinuationToken(), th2});
            return Mono.just(lease);
        });
    }

    private Mono<InternalObjectNode> tryReplaceLease(Lease lease, String str, PartitionKey partitionKey) throws LeaseLostException {
        return this.client.replaceItem(str, partitionKey, lease, getCreateIfMatchOptions(lease)).map(cosmosItemResponse -> {
            return BridgeInternal.getProperties(cosmosItemResponse);
        }).onErrorResume(th -> {
            if (!(th instanceof CosmosException)) {
                return Mono.error(th);
            }
            ?? r0 = (CosmosException) th;
            switch (r0.getStatusCode()) {
                case 404:
                    throw new LeaseLostException(lease, r0, true);
                case 409:
                    throw new LeaseLostException(lease, r0, false);
                case 412:
                    return Mono.empty();
                default:
                    return Mono.error(th);
            }
        });
    }

    private CosmosItemRequestOptions getCreateIfMatchOptions(Lease lease) {
        CosmosItemRequestOptions cosmosItemRequestOptions = new CosmosItemRequestOptions();
        cosmosItemRequestOptions.setIfMatchETag(lease.getConcurrencyToken());
        return cosmosItemRequestOptions;
    }
}
