package com.azure.cosmos.implementation.changefeed.epkversion;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.ChangeFeedProcessorContext;
import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.CosmosBridgeInternal;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.changefeed.Bootstrapper;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverFactory;
import com.azure.cosmos.implementation.changefeed.CheckpointFrequency;
import com.azure.cosmos.implementation.changefeed.HealthMonitor;
import com.azure.cosmos.implementation.changefeed.LeaseStoreManager;
import com.azure.cosmos.implementation.changefeed.PartitionLoadBalancingStrategy;
import com.azure.cosmos.implementation.changefeed.PartitionManager;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedContextClientImpl;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedMode;
import com.azure.cosmos.implementation.changefeed.common.ChangeFeedState;
import com.azure.cosmos.implementation.changefeed.common.CheckpointerObserverFactory;
import com.azure.cosmos.implementation.changefeed.common.DefaultObserverFactory;
import com.azure.cosmos.implementation.changefeed.common.EqualPartitionsBalancingStrategy;
import com.azure.cosmos.implementation.changefeed.common.PartitionedByIdCollectionRequestOptionsFactory;
import com.azure.cosmos.implementation.changefeed.common.TraceHealthMonitor;
import com.azure.cosmos.implementation.guava25.base.Preconditions;
import com.azure.cosmos.models.ChangeFeedProcessorItem;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.azure.cosmos.models.ChangeFeedProcessorState;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.FeedRange;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/cosmos/implementation/changefeed/epkversion/ChangeFeedProcessorImplBase.class */
public abstract class ChangeFeedProcessorImplBase<T> implements ChangeFeedProcessor, AutoCloseable {
    private final Logger logger = LoggerFactory.getLogger(ChangeFeedProcessorImplBase.class);
    private final Duration sleepTime = Duration.ofSeconds(15);
    private final Duration lockTime = Duration.ofSeconds(30);
    private static final int DEFAULT_QUERY_PARTITIONS_MAX_BATCH_SIZE = 100;
    private static final int DEFAULT_DEGREE_OF_PARALLELISM = 25;
    private final String hostName;
    private final ChangeFeedContextClient feedContextClient;
    private final ChangeFeedContextClient leaseContextClient;
    private final ChangeFeedProcessorOptions changeFeedProcessorOptions;
    private final ChangeFeedObserverFactory<T> observerFactory;
    private final ChangeFeedMode changeFeedMode;
    private final Scheduler scheduler;
    private volatile String databaseResourceId;
    private volatile String databaseId;
    private volatile String collectionResourceId;
    private volatile String collectionId;
    private PartitionLoadBalancingStrategy loadBalancingStrategy;
    private LeaseStoreManager leaseStoreManager;
    private HealthMonitor healthMonitor;
    private volatile PartitionManager partitionManager;

    public ChangeFeedProcessorImplBase(String str, CosmosAsyncContainer cosmosAsyncContainer, CosmosAsyncContainer cosmosAsyncContainer2, ChangeFeedProcessorOptions changeFeedProcessorOptions, Consumer<List<T>> consumer, ChangeFeedMode changeFeedMode) {
        Preconditions.checkNotNull(str, "Argument 'hostName' can not be null");
        Preconditions.checkNotNull(cosmosAsyncContainer, "Argument 'feedContainer' can not be null");
        Preconditions.checkNotNull(consumer, "Argument 'consumer' can not be null");
        changeFeedProcessorOptions = changeFeedProcessorOptions == null ? new ChangeFeedProcessorOptions() : changeFeedProcessorOptions;
        validateChangeFeedProcessorOptions(changeFeedProcessorOptions);
        validateLeaseContainer(cosmosAsyncContainer2);
        this.hostName = str;
        this.changeFeedProcessorOptions = changeFeedProcessorOptions;
        this.feedContextClient = new ChangeFeedContextClientImpl(cosmosAsyncContainer);
        this.leaseContextClient = new ChangeFeedContextClientImpl(cosmosAsyncContainer2);
        this.scheduler = this.changeFeedProcessorOptions.getScheduler();
        this.feedContextClient.setScheduler(this.scheduler);
        this.leaseContextClient.setScheduler(this.scheduler);
        this.changeFeedMode = changeFeedMode;
        this.observerFactory = new DefaultObserverFactory(consumer);
    }

    public ChangeFeedProcessorImplBase(String str, CosmosAsyncContainer cosmosAsyncContainer, CosmosAsyncContainer cosmosAsyncContainer2, ChangeFeedProcessorOptions changeFeedProcessorOptions, BiConsumer<List<T>, ChangeFeedProcessorContext> biConsumer, ChangeFeedMode changeFeedMode) {
        Preconditions.checkNotNull(str, "Argument 'hostName' can not be null");
        Preconditions.checkNotNull(cosmosAsyncContainer, "Argument 'feedContainer' can not be null");
        Preconditions.checkNotNull(biConsumer, "Argument 'biConsumer' can not be null");
        changeFeedProcessorOptions = changeFeedProcessorOptions == null ? new ChangeFeedProcessorOptions() : changeFeedProcessorOptions;
        validateChangeFeedProcessorOptions(changeFeedProcessorOptions);
        validateLeaseContainer(cosmosAsyncContainer2);
        this.hostName = str;
        this.changeFeedProcessorOptions = changeFeedProcessorOptions;
        this.feedContextClient = new ChangeFeedContextClientImpl(cosmosAsyncContainer);
        this.leaseContextClient = new ChangeFeedContextClientImpl(cosmosAsyncContainer2);
        this.scheduler = this.changeFeedProcessorOptions.getScheduler();
        this.feedContextClient.setScheduler(this.scheduler);
        this.leaseContextClient.setScheduler(this.scheduler);
        this.changeFeedMode = changeFeedMode;
        this.observerFactory = new DefaultObserverFactory(biConsumer);
    }

    abstract CosmosChangeFeedRequestOptions createRequestOptionsForProcessingFromNow(FeedRange feedRange);

    private void validateChangeFeedProcessorOptions(ChangeFeedProcessorOptions changeFeedProcessorOptions) {
        Preconditions.checkNotNull(changeFeedProcessorOptions, "Argument 'changeFeedProcessorOptions' can not be null");
        if (changeFeedProcessorOptions.getLeaseAcquireInterval().compareTo(ChangeFeedProcessorOptions.DEFAULT_ACQUIRE_INTERVAL) < 0) {
            this.logger.warn("Found lower than expected setting for leaseAcquireInterval");
        }
    }

    private void validateLeaseContainer(CosmosAsyncContainer cosmosAsyncContainer) {
        Preconditions.checkNotNull(cosmosAsyncContainer, "Argument 'leaseContainer' can not be null");
        if (!CosmosBridgeInternal.getContextClient(cosmosAsyncContainer).isContentResponseOnWriteEnabled()) {
            throw new IllegalArgumentException("leaseClient: content response on write setting must be enabled");
        }
        ConsistencyLevel consistencyLevel = CosmosBridgeInternal.getContextClient(cosmosAsyncContainer).getConsistencyLevel();
        if (consistencyLevel == ConsistencyLevel.CONSISTENT_PREFIX || consistencyLevel == ConsistencyLevel.EVENTUAL) {
            this.logger.warn("leaseClient consistency level setting are less then expected which is SESSION");
        }
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor
    public Mono<Void> start() {
        return this.partitionManager == null ? initializeCollectionPropertiesForBuild().flatMap(changeFeedProcessor -> {
            return getLeaseStoreManager().flatMap(this::buildPartitionManager);
        }).flatMap(partitionManager -> {
            this.partitionManager = partitionManager;
            return this.partitionManager.start();
        }) : this.partitionManager.start();
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor
    public Mono<Void> stop() {
        if (this.partitionManager == null || !this.partitionManager.isRunning()) {
            throw new IllegalStateException("The ChangeFeedProcessor instance has not fully started");
        }
        return this.partitionManager.stop();
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor
    public boolean isStarted() {
        return this.partitionManager != null && this.partitionManager.isRunning();
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor
    public Mono<Map<String, Integer>> getEstimatedLag() {
        throw new UnsupportedOperationException("getEstimatedLag() API is not supported. Use getCurrentState() instead");
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor
    public Mono<List<ChangeFeedProcessorState>> getCurrentState() {
        return (this.leaseContextClient == null || this.feedContextClient == null) ? Mono.just(Collections.unmodifiableList(new ArrayList())) : initializeCollectionPropertiesForBuild().flatMap(changeFeedProcessor -> {
            return getLeaseStoreManager();
        }).flatMap(leaseStoreManager -> {
            return leaseStoreManager.getAllLeases().flatMap(lease -> {
                return this.feedContextClient.createDocumentChangeFeedQuery(this.feedContextClient.getContainerClient(), createRequestOptionsForProcessingFromNow(lease.getFeedRange()), ChangeFeedProcessorItem.class, false).take(1L).map(feedResponse -> {
                    ChangeFeedProcessorState leaseToken = new ChangeFeedProcessorState().setHostName(lease.getOwner()).setLeaseToken(lease.getLeaseToken());
                    long j = 0;
                    try {
                        long lsnFromEncodedContinuationToken = getLsnFromEncodedContinuationToken(feedResponse.m683getContinuationToken());
                        leaseToken.setContinuationToken(feedResponse.m683getContinuationToken());
                        j = Strings.isNullOrWhiteSpace(lease.getContinuationToken()) ? lsnFromEncodedContinuationToken - 1 : lsnFromEncodedContinuationToken - getLsnFromEncodedContinuationToken(lease.getContinuationToken());
                    } catch (NumberFormatException e) {
                        this.logger.warn("Unexpected Cosmos LSN found", e);
                        leaseToken.setEstimatedLag(-1);
                    }
                    leaseToken.setEstimatedLag((int) Math.min(j, 2147483647L));
                    return leaseToken;
                });
            }).collectList().map(Collections::unmodifiableList);
        });
    }

    private long getLsnFromEncodedContinuationToken(String str) {
        return Long.parseLong(ChangeFeedState.fromString(str).getContinuation().getCurrentContinuationToken().getToken().replace("\"", ""));
    }

    private Mono<ChangeFeedProcessor> initializeCollectionPropertiesForBuild() {
        return this.feedContextClient.readDatabase(this.feedContextClient.getDatabaseClient(), null).map(cosmosDatabaseResponse -> {
            this.databaseResourceId = cosmosDatabaseResponse.getProperties().getResourceId();
            this.databaseId = cosmosDatabaseResponse.getProperties().getId();
            return this.databaseResourceId;
        }).flatMap(str -> {
            return this.feedContextClient.readContainer(this.feedContextClient.getContainerClient(), null).map(cosmosContainerResponse -> {
                this.collectionResourceId = cosmosContainerResponse.getProperties().getResourceId();
                this.collectionId = cosmosContainerResponse.getProperties().getId();
                return this;
            });
        });
    }

    private Mono<LeaseStoreManager> getLeaseStoreManager() {
        return this.leaseStoreManager == null ? this.leaseContextClient.readContainerSettings(this.leaseContextClient.getContainerClient(), null).flatMap(cosmosContainerProperties -> {
            if (!isContainerPartitionedById(cosmosContainerProperties)) {
                return Mono.error(new IllegalArgumentException("The lease collection must have partition key equal to id."));
            }
            this.leaseStoreManager = LeaseStoreManagerImpl.builder().leasePrefix(getLeasePrefix()).leaseCollectionLink(this.leaseContextClient.getContainerClient()).leaseContextClient(this.leaseContextClient).requestOptionsFactory(new PartitionedByIdCollectionRequestOptionsFactory()).hostName(this.hostName).build();
            return Mono.just(this.leaseStoreManager);
        }) : Mono.just(this.leaseStoreManager);
    }

    private boolean isContainerPartitionedById(CosmosContainerProperties cosmosContainerProperties) {
        return (cosmosContainerProperties == null || cosmosContainerProperties.getPartitionKeyDefinition() == null || cosmosContainerProperties.getPartitionKeyDefinition().getPaths().isEmpty() || cosmosContainerProperties.getPartitionKeyDefinition().getPaths().size() != 1 || !cosmosContainerProperties.getPartitionKeyDefinition().getPaths().get(0).equals("/id")) ? false : true;
    }

    private String getLeasePrefix() {
        String leasePrefix = this.changeFeedProcessorOptions.getLeasePrefix();
        if (leasePrefix == null) {
            leasePrefix = "";
        }
        return String.format("%s%s_%s_%s", leasePrefix, this.feedContextClient.getServiceEndpoint().getHost(), this.databaseResourceId, this.collectionResourceId);
    }

    private String getPkRangeIdVersionLeasePrefix() {
        String leasePrefix = this.changeFeedProcessorOptions.getLeasePrefix();
        if (leasePrefix == null) {
            leasePrefix = "";
        }
        return String.format("%s%s_%s_%s", leasePrefix, this.feedContextClient.getServiceEndpoint().getHost(), this.databaseId, this.collectionId);
    }

    abstract Class<T> getPartitionProcessorItemType();

    abstract boolean canBootstrapFromPkRangeIdVersionLeaseStore();

    private Mono<PartitionManager> buildPartitionManager(LeaseStoreManager leaseStoreManager) {
        Bootstrapper bootstrapperImpl;
        CheckpointerObserverFactory checkpointerObserverFactory = new CheckpointerObserverFactory(this.observerFactory, new CheckpointFrequency());
        PartitionSynchronizerImpl partitionSynchronizerImpl = new PartitionSynchronizerImpl(this.feedContextClient, BridgeInternal.extractContainerSelfLink(this.feedContextClient.getContainerClient()), leaseStoreManager, leaseStoreManager, 25, 100, this.changeFeedProcessorOptions, this.changeFeedMode);
        if (canBootstrapFromPkRangeIdVersionLeaseStore()) {
            String pkRangeIdVersionLeasePrefix = getPkRangeIdVersionLeasePrefix();
            bootstrapperImpl = new PkRangeIdVersionLeaseStoreBootstrapperImpl(partitionSynchronizerImpl, leaseStoreManager, this.lockTime, this.sleepTime, com.azure.cosmos.implementation.changefeed.pkversion.LeaseStoreManagerImpl.builder().leasePrefix(pkRangeIdVersionLeasePrefix).leaseCollectionLink(this.leaseContextClient.getContainerClient()).leaseContextClient(this.leaseContextClient).requestOptionsFactory(new PartitionedByIdCollectionRequestOptionsFactory()).hostName(this.hostName).build(), leaseStoreManager, this.changeFeedMode);
        } else {
            bootstrapperImpl = new BootstrapperImpl(partitionSynchronizerImpl, leaseStoreManager, this.lockTime, this.sleepTime, leaseStoreManager, this.changeFeedMode);
        }
        FeedRangeThroughputControlConfigManager feedRangeThroughputControlConfigManager = getFeedRangeThroughputControlConfigManager();
        PartitionSupervisorFactoryImpl partitionSupervisorFactoryImpl = new PartitionSupervisorFactoryImpl(checkpointerObserverFactory, leaseStoreManager, new PartitionProcessorFactoryImpl(this.feedContextClient, this.changeFeedProcessorOptions, leaseStoreManager, this.feedContextClient.getContainerClient(), this.collectionResourceId, this.changeFeedMode, feedRangeThroughputControlConfigManager), this.changeFeedProcessorOptions, this.scheduler, getPartitionProcessorItemType());
        if (this.loadBalancingStrategy == null) {
            this.loadBalancingStrategy = new EqualPartitionsBalancingStrategy(this.hostName, this.changeFeedProcessorOptions.getMinScaleCount(), this.changeFeedProcessorOptions.getMaxScaleCount(), this.changeFeedProcessorOptions.getLeaseExpirationInterval());
        }
        PartitionControllerImpl partitionControllerImpl = new PartitionControllerImpl(leaseStoreManager, leaseStoreManager, partitionSupervisorFactoryImpl, partitionSynchronizerImpl, this.scheduler);
        if (this.healthMonitor == null) {
            this.healthMonitor = new TraceHealthMonitor();
        }
        return Mono.just(new PartitionManagerImpl(bootstrapperImpl, partitionControllerImpl, new PartitionLoadBalancerImpl(new HealthMonitoringPartitionControllerDecorator(partitionControllerImpl, this.healthMonitor), leaseStoreManager, this.loadBalancingStrategy, this.changeFeedProcessorOptions.getLeaseAcquireInterval(), this.scheduler, feedRangeThroughputControlConfigManager)));
    }

    private FeedRangeThroughputControlConfigManager getFeedRangeThroughputControlConfigManager() {
        if (this.changeFeedProcessorOptions == null || this.changeFeedProcessorOptions.getFeedPollThroughputControlGroupConfig() == null) {
            return null;
        }
        return new FeedRangeThroughputControlConfigManager(this.changeFeedProcessorOptions.getFeedPollThroughputControlGroupConfig(), this.feedContextClient);
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        stop().subscribeOn(Schedulers.boundedElastic()).subscribe();
    }
}
