/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.fabric.stream;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.neo4j.fabric.executor.Exceptions;
import org.neo4j.fabric.stream.Record;
import org.neo4j.kernel.api.exceptions.Status;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;

public class Rx2SyncStream {
    private static final RecordOrError END = new RecordOrError(null, null);
    private final RecordSubscriber recordSubscriber;
    private final BlockingQueue<RecordOrError> buffer;
    private final int batchSize;

    public Rx2SyncStream(Flux<Record> records, int batchSize) {
        this.batchSize = batchSize;
        this.buffer = new ArrayBlockingQueue<RecordOrError>(batchSize + 2);
        this.recordSubscriber = new RecordSubscriber();
        records.subscribeWith((Subscriber)this.recordSubscriber);
    }

    public Record readRecord() {
        RecordOrError recordOrError;
        this.maybeRequest();
        try {
            recordOrError = this.buffer.take();
        }
        catch (InterruptedException e) {
            this.recordSubscriber.close();
            throw new IllegalStateException(e);
        }
        if (recordOrError == END) {
            return null;
        }
        if (recordOrError.error != null) {
            throw Exceptions.transform((Status)Status.Statement.ExecutionFailed, recordOrError.error);
        }
        return recordOrError.record;
    }

    public boolean completed() {
        return this.buffer.peek() == END;
    }

    public void close() {
        this.recordSubscriber.close();
    }

    private void maybeRequest() {
        int buffered = this.buffer.size();
        long pendingRequested = this.recordSubscriber.pendingRequested.get();
        if (pendingRequested + (long)buffered == 0L) {
            this.recordSubscriber.request(this.batchSize);
        }
    }

    private class RecordSubscriber
    implements Subscriber<Record> {
        private volatile Subscription subscription;
        private AtomicLong pendingRequested = new AtomicLong(0L);

        private RecordSubscriber() {
        }

        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
        }

        public void onNext(Record record) {
            this.pendingRequested.decrementAndGet();
            Rx2SyncStream.this.buffer.add(new RecordOrError(record, null));
        }

        public void onError(Throwable throwable) {
            Rx2SyncStream.this.buffer.add(new RecordOrError(null, throwable));
        }

        public void onComplete() {
            Rx2SyncStream.this.buffer.add(END);
        }

        void request(long numberOfRecords) {
            this.pendingRequested.addAndGet(numberOfRecords);
            this.subscription.request(numberOfRecords);
        }

        void close() {
            this.subscription.cancel();
        }
    }

    private record RecordOrError(Record record, Throwable error) {
    }
}

