package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.ChangeFeedProcessor;
import com.azure.cosmos.CosmosAsyncContainer;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverFactory;
import com.azure.cosmos.implementation.changefeed.CheckpointFrequency;
import com.azure.cosmos.implementation.changefeed.HealthMonitor;
import com.azure.cosmos.implementation.changefeed.LeaseStoreManager;
import com.azure.cosmos.implementation.changefeed.PartitionLoadBalancingStrategy;
import com.azure.cosmos.implementation.changefeed.PartitionManager;
import com.azure.cosmos.implementation.changefeed.PartitionProcessorFactory;
import com.azure.cosmos.models.ChangeFeedProcessorOptions;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* loaded from: input_file:com/azure/cosmos/implementation/changefeed/implementation/ChangeFeedProcessorBuilderImpl.class */
public class ChangeFeedProcessorBuilderImpl implements ChangeFeedProcessor.BuilderDefinition, ChangeFeedProcessor, AutoCloseable {
    private static final long DefaultUnhealthinessDuration = Duration.ofMinutes(15).toMillis();
    private final Duration sleepTime;
    private final Duration lockTime;
    private static final int DefaultQueryPartitionsMaxBatchSize = 100;
    private int queryPartitionsMaxBatchSize;
    private int degreeOfParallelism;
    private String hostName;
    private ChangeFeedContextClient feedContextClient;
    private ChangeFeedProcessorOptions changeFeedProcessorOptions;
    private ChangeFeedObserverFactory observerFactory;
    private volatile String databaseResourceId;
    private volatile String collectionResourceId;
    private ChangeFeedContextClient leaseContextClient;
    private PartitionLoadBalancingStrategy loadBalancingStrategy;
    private PartitionProcessorFactory partitionProcessorFactory;
    private LeaseStoreManager leaseStoreManager;
    private HealthMonitor healthMonitor;
    private volatile PartitionManager partitionManager;
    private Scheduler scheduler;

    @Override // com.azure.cosmos.ChangeFeedProcessor
    public Mono<Void> start() {
        return this.partitionManager == null ? initializeCollectionPropertiesForBuild().flatMap(changeFeedProcessor -> {
            return getLeaseStoreManager().flatMap(leaseStoreManager -> {
                return buildPartitionManager(leaseStoreManager);
            });
        }).flatMap(partitionManager -> {
            this.partitionManager = partitionManager;
            return this.partitionManager.start();
        }) : this.partitionManager.start();
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor
    public Mono<Void> stop() {
        if (this.partitionManager == null || !this.partitionManager.isRunning()) {
            throw new IllegalStateException("The ChangeFeedProcessor instance has not fully started");
        }
        return this.partitionManager.stop();
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor
    public boolean isStarted() {
        return this.partitionManager != null && this.partitionManager.isRunning();
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl hostName(String str) {
        this.hostName = str;
        return this;
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl feedContainer(CosmosAsyncContainer cosmosAsyncContainer) {
        if (cosmosAsyncContainer == null) {
            throw new IllegalArgumentException("feedContextClient");
        }
        this.feedContextClient = new ChangeFeedContextClientImpl(cosmosAsyncContainer);
        return this;
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl options(ChangeFeedProcessorOptions changeFeedProcessorOptions) {
        if (changeFeedProcessorOptions == null) {
            throw new IllegalArgumentException("changeFeedProcessorOptions");
        }
        this.changeFeedProcessorOptions = changeFeedProcessorOptions;
        return this;
    }

    public ChangeFeedProcessorBuilderImpl observerFactory(ChangeFeedObserverFactory changeFeedObserverFactory) {
        if (changeFeedObserverFactory == null) {
            throw new IllegalArgumentException("observerFactory");
        }
        this.observerFactory = changeFeedObserverFactory;
        return this;
    }

    public ChangeFeedProcessorBuilderImpl observer(Class<? extends ChangeFeedObserver> cls) {
        if (cls == null) {
            throw new IllegalArgumentException("type");
        }
        this.observerFactory = new ChangeFeedObserverFactoryImpl(cls);
        return this;
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl handleChanges(Consumer<List<JsonNode>> consumer) {
        return observerFactory(new DefaultObserverFactory(consumer));
    }

    public ChangeFeedProcessorBuilderImpl withDatabaseResourceId(String str) {
        this.databaseResourceId = str;
        return this;
    }

    public ChangeFeedProcessorBuilderImpl withCollectionResourceId(String str) {
        this.collectionResourceId = str;
        return this;
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessorBuilderImpl leaseContainer(CosmosAsyncContainer cosmosAsyncContainer) {
        if (cosmosAsyncContainer == null) {
            throw new IllegalArgumentException("leaseContextClient");
        }
        this.leaseContextClient = new ChangeFeedContextClientImpl(cosmosAsyncContainer);
        return this;
    }

    public ChangeFeedProcessorBuilderImpl withPartitionLoadBalancingStrategy(PartitionLoadBalancingStrategy partitionLoadBalancingStrategy) {
        if (partitionLoadBalancingStrategy == null) {
            throw new IllegalArgumentException("loadBalancingStrategy");
        }
        this.loadBalancingStrategy = partitionLoadBalancingStrategy;
        return this;
    }

    public ChangeFeedProcessorBuilderImpl withPartitionProcessorFactory(PartitionProcessorFactory partitionProcessorFactory) {
        if (partitionProcessorFactory == null) {
            throw new IllegalArgumentException("partitionProcessorFactory");
        }
        this.partitionProcessorFactory = partitionProcessorFactory;
        return this;
    }

    public ChangeFeedProcessorBuilderImpl withLeaseStoreManager(LeaseStoreManager leaseStoreManager) {
        if (leaseStoreManager == null) {
            throw new IllegalArgumentException("leaseStoreManager");
        }
        this.leaseStoreManager = leaseStoreManager;
        return this;
    }

    public ChangeFeedProcessorBuilderImpl withHealthMonitor(HealthMonitor healthMonitor) {
        if (healthMonitor == null) {
            throw new IllegalArgumentException("healthMonitor");
        }
        this.healthMonitor = healthMonitor;
        return this;
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor.BuilderDefinition
    public ChangeFeedProcessor build() {
        if (this.hostName == null) {
            throw new IllegalArgumentException("Host name was not specified");
        }
        if (this.observerFactory == null) {
            throw new IllegalArgumentException("Observer was not specified");
        }
        if (this.scheduler == null) {
            this.scheduler = Schedulers.elastic();
        }
        return this;
    }

    public ChangeFeedProcessorBuilderImpl() {
        this.sleepTime = Duration.ofSeconds(15L);
        this.lockTime = Duration.ofSeconds(30L);
        this.queryPartitionsMaxBatchSize = 100;
        this.degreeOfParallelism = 25;
        this.queryPartitionsMaxBatchSize = 100;
        this.degreeOfParallelism = 25;
    }

    public ChangeFeedProcessorBuilderImpl(PartitionManager partitionManager) {
        this.sleepTime = Duration.ofSeconds(15L);
        this.lockTime = Duration.ofSeconds(30L);
        this.queryPartitionsMaxBatchSize = 100;
        this.degreeOfParallelism = 25;
        this.partitionManager = partitionManager;
    }

    private Mono<ChangeFeedProcessor> initializeCollectionPropertiesForBuild() {
        if (this.changeFeedProcessorOptions == null) {
            this.changeFeedProcessorOptions = new ChangeFeedProcessorOptions();
        }
        return this.feedContextClient.readDatabase(this.feedContextClient.getDatabaseClient(), null).map(cosmosAsyncDatabaseResponse -> {
            this.databaseResourceId = cosmosAsyncDatabaseResponse.getDatabase().getId();
            return this.databaseResourceId;
        }).flatMap(str -> {
            return this.feedContextClient.readContainer(this.feedContextClient.getContainerClient(), null).map(cosmosAsyncContainerResponse -> {
                this.collectionResourceId = cosmosAsyncContainerResponse.getContainer().getId();
                return this;
            });
        });
    }

    private Mono<LeaseStoreManager> getLeaseStoreManager() {
        return this.leaseStoreManager == null ? this.leaseContextClient.readContainerSettings(this.leaseContextClient.getContainerClient(), null).flatMap(cosmosContainerProperties -> {
            if (((cosmosContainerProperties.getPartitionKeyDefinition() == null || cosmosContainerProperties.getPartitionKeyDefinition().getPaths() == null || cosmosContainerProperties.getPartitionKeyDefinition().getPaths().size() <= 0) ? false : true) && cosmosContainerProperties.getPartitionKeyDefinition().getPaths().size() == 1 && cosmosContainerProperties.getPartitionKeyDefinition().getPaths().get(0).equals("/id")) {
                return LeaseStoreManager.builder().leasePrefix(getLeasePrefix()).leaseCollectionLink(this.leaseContextClient.getContainerClient()).leaseContextClient(this.leaseContextClient).requestOptionsFactory(new PartitionedByIdCollectionRequestOptionsFactory()).hostName(this.hostName).build().map(leaseStoreManager -> {
                    this.leaseStoreManager = leaseStoreManager;
                    return this.leaseStoreManager;
                });
            }
            return Mono.error(new IllegalArgumentException("The lease collection must have partition key equal to id."));
        }) : Mono.just(this.leaseStoreManager);
    }

    private String getLeasePrefix() {
        String leasePrefix = this.changeFeedProcessorOptions.getLeasePrefix();
        if (leasePrefix == null) {
            leasePrefix = "";
        }
        return String.format("%s%s_%s_%s", leasePrefix, this.feedContextClient.getServiceEndpoint().getHost(), this.databaseResourceId, this.collectionResourceId);
    }

    private Mono<PartitionManager> buildPartitionManager(LeaseStoreManager leaseStoreManager) {
        CheckpointerObserverFactory checkpointerObserverFactory = new CheckpointerObserverFactory(this.observerFactory, new CheckpointFrequency());
        PartitionSynchronizerImpl partitionSynchronizerImpl = new PartitionSynchronizerImpl(this.feedContextClient, this.feedContextClient.getContainerClient(), leaseStoreManager, leaseStoreManager, this.degreeOfParallelism, this.queryPartitionsMaxBatchSize);
        BootstrapperImpl bootstrapperImpl = new BootstrapperImpl(partitionSynchronizerImpl, leaseStoreManager, this.lockTime, this.sleepTime);
        PartitionSupervisorFactoryImpl partitionSupervisorFactoryImpl = new PartitionSupervisorFactoryImpl(checkpointerObserverFactory, leaseStoreManager, this.partitionProcessorFactory != null ? this.partitionProcessorFactory : new PartitionProcessorFactoryImpl(this.feedContextClient, this.changeFeedProcessorOptions, leaseStoreManager, this.feedContextClient.getContainerClient()), this.changeFeedProcessorOptions, this.scheduler);
        if (this.loadBalancingStrategy == null) {
            this.loadBalancingStrategy = new EqualPartitionsBalancingStrategy(this.hostName, this.changeFeedProcessorOptions.getMinScaleCount(), this.changeFeedProcessorOptions.getMaxScaleCount(), this.changeFeedProcessorOptions.getLeaseExpirationInterval());
        }
        PartitionControllerImpl partitionControllerImpl = new PartitionControllerImpl(leaseStoreManager, leaseStoreManager, partitionSupervisorFactoryImpl, partitionSynchronizerImpl, this.scheduler);
        if (this.healthMonitor == null) {
            this.healthMonitor = new TraceHealthMonitor();
        }
        return Mono.just(new PartitionManagerImpl(bootstrapperImpl, partitionControllerImpl, new PartitionLoadBalancerImpl(new HealthMonitoringPartitionControllerDecorator(partitionControllerImpl, this.healthMonitor), leaseStoreManager, this.loadBalancingStrategy, this.changeFeedProcessorOptions.getLeaseAcquireInterval(), this.scheduler)));
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        stop().subscribeOn(Schedulers.elastic()).subscribe();
    }

    @Override // com.azure.cosmos.ChangeFeedProcessor.BuilderDefinition
    public /* bridge */ /* synthetic */ ChangeFeedProcessor.BuilderDefinition handleChanges(Consumer consumer) {
        return handleChanges((Consumer<List<JsonNode>>) consumer);
    }
}
