package io.axoniq.axonserver.connector;

import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/axoniq/axonserver/connector/ResultStreamPublisher.class */
public class ResultStreamPublisher<M> implements Publisher<M> {
    private static final Logger logger = LoggerFactory.getLogger(ResultStreamPublisher.class);
    private final Supplier<ResultStream<M>> resultStreamSupplier;

    /* loaded from: input_file:io/axoniq/axonserver/connector/ResultStreamPublisher$ResultStreamSubscription.class */
    private class ResultStreamSubscription implements Subscription {
        private final Subscriber<? super M> subscriber;
        private final ResultStream<M> resultStream;
        private final AtomicLong requested;
        private final AtomicBoolean signalGate;
        private final AtomicBoolean cancelled;
        private final AtomicBoolean completed;

        private ResultStreamSubscription(Subscriber<? super M> subscriber, ResultStream<M> resultStream) {
            this.requested = new AtomicLong(0L);
            this.signalGate = new AtomicBoolean(false);
            this.cancelled = new AtomicBoolean(false);
            this.completed = new AtomicBoolean(false);
            this.subscriber = subscriber;
            this.resultStream = resultStream;
        }

        public void request(long j) {
            if (j <= 0) {
                this.subscriber.onError(new IllegalArgumentException("negative subscription request"));
            } else {
                this.requested.updateAndGet(j2 -> {
                    return j2 + Math.min(Long.MAX_VALUE - j2, j);
                });
                signal();
            }
        }

        public void cancel() {
            ResultStreamPublisher.logger.debug("The call has been cancelled.");
            this.cancelled.set(true);
            this.resultStream.close();
        }

        private void signal() {
            while (canConsume() && this.signalGate.compareAndSet(false, true)) {
                try {
                    try {
                        long j = this.requested.get();
                        long j2 = 0;
                        for (int i = 0; i < j && !this.resultStream.isClosed() && this.resultStream.peek() != null; i++) {
                            this.subscriber.onNext(this.resultStream.next());
                            j2--;
                        }
                        this.requested.getAndAccumulate(j2, Long::sum);
                        if (this.resultStream.isClosed()) {
                            Optional<Throwable> error = this.resultStream.getError();
                            if (error.isPresent()) {
                                onError(error.get());
                            } else {
                                this.subscriber.onComplete();
                                this.completed.set(true);
                            }
                        }
                        this.signalGate.set(false);
                    } catch (InterruptedException e) {
                        this.signalGate.set(false);
                        Thread.currentThread().interrupt();
                        this.signalGate.set(false);
                    } catch (Exception e2) {
                        onError(e2);
                        this.signalGate.set(false);
                    }
                } catch (Throwable th) {
                    this.signalGate.set(false);
                    throw th;
                }
            }
        }

        private void onError(Throwable th) {
            ResultStreamPublisher.logger.debug("An error occurred accessing the ResultStream.", th);
            this.subscriber.onError(th);
            this.completed.set(true);
        }

        /* JADX WARN: Code restructure failed: missing block: B:11:0x0035, code lost:
        
            if (r5.requested.get() > 0) goto L12;
         */
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        private boolean canConsume() {
            /*
                r5 = this;
                r0 = r5
                java.util.concurrent.atomic.AtomicBoolean r0 = r0.cancelled     // Catch: java.lang.Exception -> L3e
                boolean r0 = r0.get()     // Catch: java.lang.Exception -> L3e
                if (r0 != 0) goto L3c
                r0 = r5
                java.util.concurrent.atomic.AtomicBoolean r0 = r0.completed     // Catch: java.lang.Exception -> L3e
                boolean r0 = r0.get()     // Catch: java.lang.Exception -> L3e
                if (r0 != 0) goto L3c
                r0 = r5
                io.axoniq.axonserver.connector.ResultStream<M> r0 = r0.resultStream     // Catch: java.lang.Exception -> L3e
                boolean r0 = r0.isClosed()     // Catch: java.lang.Exception -> L3e
                if (r0 != 0) goto L38
                r0 = r5
                io.axoniq.axonserver.connector.ResultStream<M> r0 = r0.resultStream     // Catch: java.lang.Exception -> L3e
                java.lang.Object r0 = r0.peek()     // Catch: java.lang.Exception -> L3e
                if (r0 == 0) goto L3c
                r0 = r5
                java.util.concurrent.atomic.AtomicLong r0 = r0.requested     // Catch: java.lang.Exception -> L3e
                long r0 = r0.get()     // Catch: java.lang.Exception -> L3e
                r1 = 0
                int r0 = (r0 > r1 ? 1 : (r0 == r1 ? 0 : -1))
                if (r0 <= 0) goto L3c
            L38:
                r0 = 1
                goto L3d
            L3c:
                r0 = 0
            L3d:
                return r0
            L3e:
                r6 = move-exception
                r0 = r5
                r1 = r6
                r0.onError(r1)
                r0 = 0
                return r0
            */
            throw new UnsupportedOperationException("Method not decompiled: io.axoniq.axonserver.connector.ResultStreamPublisher.ResultStreamSubscription.canConsume():boolean");
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void afterSubscribe() {
            this.resultStream.onAvailable(this::signal);
            signal();
        }
    }

    public ResultStreamPublisher(Supplier<ResultStream<M>> supplier) {
        this.resultStreamSupplier = supplier;
    }

    public void subscribe(Subscriber<? super M> subscriber) {
        ResultStreamSubscription resultStreamSubscription = new ResultStreamSubscription(subscriber, this.resultStreamSupplier.get());
        subscriber.onSubscribe(resultStreamSubscription);
        resultStreamSubscription.afterSubscribe();
    }
}
