/*
 * Decompiled with CFR 0.152.
 */
package com.azure.storage.blob.changefeed;

import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.paging.ContinuablePagedFlux;
import com.azure.storage.blob.changefeed.BlobChangefeedPagedResponse;
import com.azure.storage.blob.changefeed.Changefeed;
import com.azure.storage.blob.changefeed.ChangefeedFactory;
import com.azure.storage.blob.changefeed.implementation.models.BlobChangefeedEventWrapper;
import com.azure.storage.blob.changefeed.implementation.models.ChangefeedCursor;
import com.azure.storage.blob.changefeed.models.BlobChangefeedEvent;
import com.azure.storage.common.implementation.StorageImplUtils;
import java.time.OffsetDateTime;
import java.util.List;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.context.ContextView;

public final class BlobChangefeedPagedFlux
extends ContinuablePagedFlux<String, BlobChangefeedEvent, BlobChangefeedPagedResponse> {
    private static final ClientLogger LOGGER = new ClientLogger(BlobChangefeedPagedFlux.class);
    private final Changefeed changefeed;
    private static final Integer DEFAULT_PAGE_SIZE = 5000;
    private Context context;

    BlobChangefeedPagedFlux(ChangefeedFactory changefeedFactory, OffsetDateTime startTime, OffsetDateTime endTime) {
        StorageImplUtils.assertNotNull((String)"changefeedFactory", (Object)changefeedFactory);
        this.changefeed = changefeedFactory.getChangefeed(startTime, endTime);
    }

    BlobChangefeedPagedFlux(ChangefeedFactory changefeedFactory, String cursor) {
        StorageImplUtils.assertNotNull((String)"changefeedFactory", (Object)changefeedFactory);
        this.changefeed = changefeedFactory.getChangefeed(cursor);
    }

    BlobChangefeedPagedFlux setSubscriberContext(Context context) {
        this.context = context;
        return this;
    }

    public Flux<BlobChangefeedPagedResponse> byPage() {
        return this.byPage(null, (int)DEFAULT_PAGE_SIZE);
    }

    public Flux<BlobChangefeedPagedResponse> byPage(String continuationToken) {
        return this.byPage(continuationToken, (int)DEFAULT_PAGE_SIZE);
    }

    public Flux<BlobChangefeedPagedResponse> byPage(int preferredPageSize) {
        return this.byPage(null, preferredPageSize);
    }

    public Flux<BlobChangefeedPagedResponse> byPage(String continuationToken, int preferredPageSize) {
        if (continuationToken != null) {
            return FluxUtil.pagedFluxError((ClientLogger)LOGGER, (RuntimeException)new UnsupportedOperationException("continuationToken not supported. Use client.getEvents(String) to pass in a cursor."));
        }
        if (preferredPageSize <= 0) {
            return FluxUtil.pagedFluxError((ClientLogger)LOGGER, (RuntimeException)new IllegalArgumentException("preferredPageSize > 0 required but provided: " + preferredPageSize));
        }
        preferredPageSize = Integer.min(preferredPageSize, DEFAULT_PAGE_SIZE);
        return this.changefeed.getEvents().window(preferredPageSize).concatMap(eventWrappers -> {
            Flux cachedEventWrappers = eventWrappers.cache();
            Mono c = cachedEventWrappers.last().map(BlobChangefeedEventWrapper::getCursor);
            Mono e = cachedEventWrappers.map(BlobChangefeedEventWrapper::getEvent).collectList();
            return Mono.zip((Mono)e, (Mono)c);
        }).map(tuple2 -> new BlobChangefeedPagedResponse((List)tuple2.getT1(), (ChangefeedCursor)tuple2.getT2())).contextWrite((ContextView)FluxUtil.toReactorContext((Context)this.context));
    }

    public void subscribe(CoreSubscriber<? super BlobChangefeedEvent> coreSubscriber) {
        this.changefeed.getEvents().map(BlobChangefeedEventWrapper::getEvent).subscribe(coreSubscriber);
    }
}

