package com.azure.cosmos.implementation.changefeed.common;

import com.azure.cosmos.implementation.changefeed.ChangeFeedObserver;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverCloseReason;
import com.azure.cosmos.implementation.changefeed.ChangeFeedObserverContext;
import java.util.List;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/azure/cosmos/implementation/changefeed/common/DefaultObserver.class */
public class DefaultObserver<T> implements ChangeFeedObserver<T> {
    private static final Logger log = LoggerFactory.getLogger(DefaultObserver.class);
    private final Consumer<List<T>> consumer;

    public DefaultObserver(Consumer<List<T>> consumer) {
        this.consumer = consumer;
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedObserver
    public void open(ChangeFeedObserverContext<T> changeFeedObserverContext) {
        log.info("Open processing from thread {}", Long.valueOf(Thread.currentThread().getId()));
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedObserver
    public void close(ChangeFeedObserverContext<T> changeFeedObserverContext, ChangeFeedObserverCloseReason changeFeedObserverCloseReason) {
        log.info("Close processing from thread {}", Long.valueOf(Thread.currentThread().getId()));
    }

    @Override // com.azure.cosmos.implementation.changefeed.ChangeFeedObserver
    public Mono<Void> processChanges(ChangeFeedObserverContext<T> changeFeedObserverContext, List<T> list) {
        log.info("Start processing from thread {}", Long.valueOf(Thread.currentThread().getId()));
        try {
            this.consumer.accept(list);
            log.info("Done processing from thread {}", Long.valueOf(Thread.currentThread().getId()));
            return Mono.empty();
        } catch (Exception e) {
            log.warn("Unexpected exception thrown from thread {}", Long.valueOf(Thread.currentThread().getId()), e);
            return Mono.error(e);
        }
    }
}
