package com.azure.cosmos.implementation.changefeed.implementation;

import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.ChangeFeedOptions;
import com.azure.cosmos.implementation.changefeed.CancellationToken;
import com.azure.cosmos.implementation.changefeed.ChangeFeedContextClient;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.PartitionCheckpointer;
import com.azure.cosmos.implementation.changefeed.PartitionProcessor;
import com.azure.cosmos.implementation.changefeed.ProcessorSettings;
import com.azure.cosmos.implementation.changefeed.exceptions.LeaseLostException;
import com.azure.cosmos.implementation.changefeed.exceptions.PartitionNotFoundException;
import com.azure.cosmos.implementation.changefeed.exceptions.PartitionSplitException;
import com.azure.cosmos.implementation.changefeed.exceptions.TaskCancelledException;
import com.azure.cosmos.implementation.guava25.base.Ascii;
import com.azure.cosmos.models.FeedResponse;
import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalAmount;
import java.time.temporal.TemporalUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl.class */
class PartitionProcessorImpl implements PartitionProcessor {
    private static final Logger logger = LoggerFactory.getLogger(PartitionProcessorImpl.class);
    private static final int DefaultMaxItemCount = 100;
    private final ProcessorSettings settings;
    private final PartitionCheckpointer checkpointer;
    private final ChangeFeedObserver observer;
    private final ChangeFeedOptions options = new ChangeFeedOptions();
    private final ChangeFeedContextClient documentClient;
    private volatile RuntimeException resultException;
    private volatile String lastContinuation;
    private volatile boolean isFirstQueryForChangeFeeds;

    /* renamed from: com.azure.cosmos.implementation.changefeed.implementation.PartitionProcessorImpl$1, reason: invalid class name */
    /* loaded from: input_file:com/azure/cosmos/implementation/changefeed/implementation/PartitionProcessorImpl$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$azure$cosmos$implementation$changefeed$implementation$StatusCodeErrorType = new int[StatusCodeErrorType.values().length];

        static {
            try {
                $SwitchMap$com$azure$cosmos$implementation$changefeed$implementation$StatusCodeErrorType[StatusCodeErrorType.PARTITION_NOT_FOUND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$azure$cosmos$implementation$changefeed$implementation$StatusCodeErrorType[StatusCodeErrorType.PARTITION_SPLIT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$azure$cosmos$implementation$changefeed$implementation$StatusCodeErrorType[StatusCodeErrorType.UNDEFINED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$azure$cosmos$implementation$changefeed$implementation$StatusCodeErrorType[StatusCodeErrorType.MAX_ITEM_COUNT_TOO_LARGE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$azure$cosmos$implementation$changefeed$implementation$StatusCodeErrorType[StatusCodeErrorType.TRANSIENT_ERROR.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    public PartitionProcessorImpl(ChangeFeedObserver changeFeedObserver, ChangeFeedContextClient changeFeedContextClient, ProcessorSettings processorSettings, PartitionCheckpointer partitionCheckpointer) {
        this.observer = changeFeedObserver;
        this.documentClient = changeFeedContextClient;
        this.settings = processorSettings;
        this.checkpointer = partitionCheckpointer;
        this.options.setMaxItemCount(Integer.valueOf(processorSettings.getMaxItemCount()));
        this.options.setPartitionKeyRangeId(processorSettings.getPartitionKeyRangeId());
        this.options.setStartFromBeginning(processorSettings.isStartFromBeginning());
        this.options.setRequestContinuation(processorSettings.getStartContinuation());
        this.options.setStartDateTime(processorSettings.getStartTime());
    }

    @Override // com.azure.cosmos.implementation.changefeed.PartitionProcessor
    public Mono<Void> run(CancellationToken cancellationToken) {
        this.lastContinuation = this.settings.getStartContinuation();
        this.isFirstQueryForChangeFeeds = true;
        this.options.setRequestContinuation(this.lastContinuation);
        return Flux.just(this).flatMap(partitionProcessorImpl -> {
            if (cancellationToken.isCancellationRequested()) {
                return Flux.empty();
            }
            if (this.isFirstQueryForChangeFeeds) {
                this.isFirstQueryForChangeFeeds = false;
                return Flux.just(partitionProcessorImpl);
            }
            Instant plus = Instant.now().plus((TemporalAmount) this.settings.getFeedPollDelay());
            return Mono.just(partitionProcessorImpl).delayElement(Duration.ofMillis(100L)).repeat(() -> {
                return !cancellationToken.isCancellationRequested() && Instant.now().isBefore(plus);
            }).last();
        }).flatMap(partitionProcessorImpl2 -> {
            return this.documentClient.createDocumentChangeFeedQuery(this.settings.getCollectionSelfLink(), this.options).limitRequest(1L);
        }).flatMap(feedResponse -> {
            if (cancellationToken.isCancellationRequested()) {
                return Flux.error(new TaskCancelledException());
            }
            this.lastContinuation = feedResponse.m403getContinuationToken();
            if (feedResponse.getResults() != null && feedResponse.getResults().size() > 0) {
                return dispatchChanges(feedResponse).doOnError(th -> {
                    logger.debug("Exception was thrown from thread {}", Long.valueOf(Thread.currentThread().getId()), th);
                }).doOnSuccess(r5 -> {
                    this.options.setRequestContinuation(this.lastContinuation);
                    if (cancellationToken.isCancellationRequested()) {
                        throw new TaskCancelledException();
                    }
                });
            }
            this.options.setRequestContinuation(this.lastContinuation);
            return cancellationToken.isCancellationRequested() ? Flux.error(new TaskCancelledException()) : Flux.empty();
        }).doOnComplete(() -> {
            if (this.options.getMaxItemCount().compareTo(Integer.valueOf(this.settings.getMaxItemCount())) != 0) {
                this.options.setMaxItemCount(Integer.valueOf(this.settings.getMaxItemCount()));
            }
        }).onErrorResume(th -> {
            if (th instanceof CosmosException) {
                ?? r0 = (CosmosException) th;
                logger.warn("CosmosException: partition {} from thread {}", new Object[]{this.settings.getPartitionKeyRangeId(), Long.valueOf(Thread.currentThread().getId()), r0});
                StatusCodeErrorType classifyClientException = ExceptionClassifier.classifyClientException(r0);
                switch (AnonymousClass1.$SwitchMap$com$azure$cosmos$implementation$changefeed$implementation$StatusCodeErrorType[classifyClientException.ordinal()]) {
                    case 1:
                        this.resultException = new PartitionNotFoundException("Partition not found.", this.lastContinuation);
                        break;
                    case Ascii.STX /* 2 */:
                        this.resultException = new PartitionSplitException("Partition split.", this.lastContinuation);
                        break;
                    case 3:
                        this.resultException = new RuntimeException((Throwable) r0);
                        break;
                    case 4:
                        if (this.options.getMaxItemCount() == null) {
                            this.options.setMaxItemCount(100);
                        } else if (this.options.getMaxItemCount().intValue() <= 1) {
                            logger.error("Cannot reduce maxItemCount further as it's already at {}", this.options.getMaxItemCount(), (Object) r0);
                            this.resultException = new RuntimeException((Throwable) r0);
                        }
                        this.options.setMaxItemCount(Integer.valueOf(this.options.getMaxItemCount().intValue() / 2));
                        logger.warn("Reducing maxItemCount, new value: {}", this.options.getMaxItemCount());
                        return Flux.empty();
                    case 5:
                        if (r0.getRetryAfterDuration().toMillis() > 0) {
                            Instant plus = Instant.now().plus(r0.getRetryAfterDuration().toMillis(), (TemporalUnit) ChronoUnit.MILLIS);
                            return Mono.just(Long.valueOf(r0.getRetryAfterDuration().toMillis())).delayElement(Duration.ofMillis(100L)).repeat(() -> {
                                return !cancellationToken.isCancellationRequested() && Instant.now().isBefore(plus);
                            }).flatMap(l -> {
                                return Flux.empty();
                            });
                        }
                        break;
                    default:
                        logger.error("Unrecognized Cosmos exception returned error code {}", classifyClientException, (Object) r0);
                        this.resultException = new RuntimeException((Throwable) r0);
                        break;
                }
            } else if (th instanceof LeaseLostException) {
                logger.info("LeaseLoseException with partition {} from thread {}", this.settings.getPartitionKeyRangeId(), Long.valueOf(Thread.currentThread().getId()));
                this.resultException = (LeaseLostException) th;
            } else if (th instanceof TaskCancelledException) {
                logger.debug("Task cancelled exception: partition {} from {}", new Object[]{this.settings.getPartitionKeyRangeId(), Long.valueOf(Thread.currentThread().getId()), th});
                this.resultException = (TaskCancelledException) th;
            } else {
                logger.warn("Unexpected exception from thread {}", Long.valueOf(Thread.currentThread().getId()), th);
                this.resultException = new RuntimeException(th);
            }
            return Flux.error(th);
        }).repeat(() -> {
            if (!cancellationToken.isCancellationRequested()) {
                return true;
            }
            this.resultException = new TaskCancelledException();
            return false;
        }).onErrorResume(th2 -> {
            if (this.resultException == null) {
                this.resultException = new RuntimeException(th2);
            }
            return Flux.empty();
        }).then();
    }

    @Override // com.azure.cosmos.implementation.changefeed.PartitionProcessor
    public RuntimeException getResultException() {
        return this.resultException;
    }

    private Mono<Void> dispatchChanges(FeedResponse<JsonNode> feedResponse) {
        return this.observer.processChanges(new ChangeFeedObserverContextImpl(this.settings.getPartitionKeyRangeId(), feedResponse, this.checkpointer), feedResponse.getResults());
    }
}
