package karate.com.linecorp.armeria.common.stream;

import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import karate.com.linecorp.armeria.common.annotation.Nullable;
import karate.com.linecorp.armeria.common.annotation.UnstableApi;
import karate.com.linecorp.armeria.common.stream.AbstractStreamWriter;
import karate.com.linecorp.armeria.common.stream.CancellableStreamMessage;
import karate.com.linecorp.armeria.common.util.Exceptions;
import karate.com.linecorp.armeria.internal.common.stream.AbortingSubscriber;
import karate.com.linecorp.armeria.internal.common.stream.InternalStreamMessageUtil;
import karate.com.linecorp.armeria.internal.shaded.jctools.queues.MpscChunkedArrayQueue;
import karate.io.netty.util.concurrent.EventExecutor;
import karate.io.netty.util.concurrent.ImmediateEventExecutor;
import karate.org.reactivestreams.Subscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@UnstableApi
/* loaded from: input_file:karate/com/linecorp/armeria/common/stream/DefaultStreamMessage.class */
public class DefaultStreamMessage<T> extends AbstractStreamWriter<T> {
    private static final Logger logger;
    private static final AtomicReferenceFieldUpdater<DefaultStreamMessage, CancellableStreamMessage.SubscriptionImpl> subscriptionUpdater;
    private static final AtomicReferenceFieldUpdater<DefaultStreamMessage, AbstractStreamWriter.State> stateUpdater;
    private static final int INITIAL_CAPACITY = 32;

    @Nullable
    private Throwable cleanupCause;

    @Nullable
    private volatile CancellableStreamMessage.SubscriptionImpl subscription;
    private long demand;
    private volatile boolean wroteAny;
    private boolean inOnNext;
    private boolean invokedOnSubscribe;
    static final /* synthetic */ boolean $assertionsDisabled;
    private volatile AbstractStreamWriter.State state = AbstractStreamWriter.State.OPEN;
    private final Queue<Object> queue = new MpscChunkedArrayQueue(32, 1073741824);

    @Deprecated
    public DefaultStreamMessage() {
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamMessage
    public final boolean isOpen() {
        return this.state == AbstractStreamWriter.State.OPEN;
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamMessage
    public final boolean isEmpty() {
        return (isOpen() || this.wroteAny) ? false : true;
    }

    @Override // karate.com.linecorp.armeria.common.stream.CancellableStreamMessage
    CancellableStreamMessage.SubscriptionImpl subscribe(CancellableStreamMessage.SubscriptionImpl subscriptionImpl) {
        if (subscriptionUpdater.compareAndSet(this, null, subscriptionImpl)) {
            Subscriber<Object> subscriber = subscriptionImpl.subscriber();
            if (subscriptionImpl.needsDirectInvocation()) {
                subscribe(subscriptionImpl, subscriber);
            } else {
                subscriptionImpl.executor().execute(() -> {
                    subscribe(subscriptionImpl, (Subscriber<Object>) subscriber);
                });
            }
            return subscriptionImpl;
        }
        CancellableStreamMessage.SubscriptionImpl subscriptionImpl2 = this.subscription;
        if ($assertionsDisabled || subscriptionImpl2 != null) {
            return subscriptionImpl2;
        }
        throw new AssertionError();
    }

    private void subscribe(CancellableStreamMessage.SubscriptionImpl subscriptionImpl, Subscriber<Object> subscriber) {
        try {
            subscribe0(subscriptionImpl.executor(), subscriptionImpl.options());
            this.invokedOnSubscribe = true;
            subscriber.onSubscribe(subscriptionImpl);
            if (!this.queue.isEmpty()) {
                notifySubscriber0();
            }
        } catch (Throwable th) {
            if (setState(AbstractStreamWriter.State.OPEN, AbstractStreamWriter.State.CLEANUP) || setState(AbstractStreamWriter.State.CLOSED, AbstractStreamWriter.State.CLEANUP)) {
                notifySubscriberOfCloseEvent(subscriptionImpl, newCloseEvent(th));
                Exceptions.throwIfFatal(th);
            } else {
                Exceptions.throwIfFatal(th);
                logger.warn("Subscriber.onSubscribe() should not raise an exception. subscriber: {}", subscriber, th);
            }
        }
    }

    protected void subscribe0(EventExecutor eventExecutor, SubscriptionOption[] subscriptionOptionArr) {
    }

    protected void onRequest(long j) {
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamMessage
    public final void abort() {
        abort0(null);
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamMessage
    public final void abort(Throwable th) {
        Objects.requireNonNull(th, "cause");
        abort0(th);
    }

    private void abort0(@Nullable Throwable th) {
        if (this.state == AbstractStreamWriter.State.CLEANUP) {
            return;
        }
        if (th == null) {
            th = AbortedStreamException.get();
        }
        CancellableStreamMessage.SubscriptionImpl subscriptionImpl = this.subscription;
        if (subscriptionImpl == null) {
            CancellableStreamMessage.SubscriptionImpl subscriptionImpl2 = new CancellableStreamMessage.SubscriptionImpl(this, AbortingSubscriber.get(th), ImmediateEventExecutor.INSTANCE, InternalStreamMessageUtil.EMPTY_OPTIONS);
            if (subscriptionUpdater.compareAndSet(this, null, subscriptionImpl2)) {
                this.invokedOnSubscribe = true;
                subscriptionImpl = subscriptionImpl2;
            } else {
                subscriptionImpl = this.subscription;
            }
        }
        if (!$assertionsDisabled && subscriptionImpl == null) {
            throw new AssertionError();
        }
        CancellableStreamMessage.SubscriptionImpl subscriptionImpl3 = subscriptionImpl;
        if (setState(AbstractStreamWriter.State.OPEN, AbstractStreamWriter.State.CLEANUP)) {
            notifySubscriberOfCloseEvent(subscriptionImpl3, newCloseEvent(th));
            return;
        }
        if (setState(AbstractStreamWriter.State.CLOSED, AbstractStreamWriter.State.CLEANUP)) {
            if (subscriptionImpl3.needsDirectInvocation()) {
                abort0(th, subscriptionImpl3);
            } else {
                Throwable th2 = th;
                subscriptionImpl3.executor().execute(() -> {
                    abort0(th2, subscriptionImpl3);
                });
            }
        }
    }

    private void abort0(Throwable th, CancellableStreamMessage.SubscriptionImpl subscriptionImpl) {
        Object peek = this.queue.peek();
        if (this.wroteAny || !(peek instanceof CancellableStreamMessage.CloseEvent)) {
            notifySubscriberOfCloseEvent(subscriptionImpl, newCloseEvent(th));
        } else {
            notifySubscriberOfCloseEvent(subscriptionImpl, (CancellableStreamMessage.CloseEvent) this.queue.remove());
        }
    }

    @Override // karate.com.linecorp.armeria.common.stream.AbstractStreamWriter
    final void addObject(T t) {
        this.wroteAny = true;
        addObjectOrEvent(t);
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamMessage
    public final long demand() {
        return this.demand;
    }

    @Override // karate.com.linecorp.armeria.common.stream.CancellableStreamMessage
    final void request(long j) {
        CancellableStreamMessage.SubscriptionImpl subscriptionImpl = this.subscription;
        if (!$assertionsDisabled && subscriptionImpl == null) {
            throw new AssertionError();
        }
        if (subscriptionImpl.needsDirectInvocation()) {
            doRequest(j);
        } else {
            subscriptionImpl.executor().execute(() -> {
                doRequest(j);
            });
        }
    }

    private void doRequest(long j) {
        long j2 = this.demand;
        if (j2 >= Long.MAX_VALUE - j) {
            this.demand = Long.MAX_VALUE;
        } else {
            this.demand = j2 + j;
        }
        onRequest(j);
        if (j2 != 0 || this.queue.isEmpty()) {
            return;
        }
        notifySubscriber0();
    }

    @Override // karate.com.linecorp.armeria.common.stream.CancellableStreamMessage
    final void cancel() {
        if (setState(AbstractStreamWriter.State.OPEN, AbstractStreamWriter.State.CLEANUP) || setState(AbstractStreamWriter.State.CLOSED, AbstractStreamWriter.State.CLEANUP)) {
            CancellableStreamMessage.SubscriptionImpl subscriptionImpl = this.subscription;
            if (!$assertionsDisabled && subscriptionImpl == null) {
                throw new AssertionError();
            }
            notifySubscriberOfCloseEvent(subscriptionImpl, CANCELLED_CLOSE);
        }
    }

    private void notifySubscriberOfCloseEvent(CancellableStreamMessage.SubscriptionImpl subscriptionImpl, CancellableStreamMessage.CloseEvent closeEvent) {
        if (subscriptionImpl.needsDirectInvocation()) {
            notifySubscriberOfCloseEvent0(subscriptionImpl, closeEvent);
        } else {
            subscriptionImpl.executor().execute(() -> {
                notifySubscriberOfCloseEvent0(subscriptionImpl, closeEvent);
            });
        }
    }

    private void notifySubscriberOfCloseEvent0(CancellableStreamMessage.SubscriptionImpl subscriptionImpl, CancellableStreamMessage.CloseEvent closeEvent) {
        try {
            closeEvent.notifySubscriber(subscriptionImpl, whenComplete());
            subscriptionImpl.clearSubscriber();
            Throwable th = closeEvent.cause;
            if (this.state == AbstractStreamWriter.State.CLEANUP) {
                this.cleanupCause = th;
            }
            cleanupObjects(th);
        } catch (Throwable th2) {
            subscriptionImpl.clearSubscriber();
            Throwable th3 = closeEvent.cause;
            if (this.state == AbstractStreamWriter.State.CLEANUP) {
                this.cleanupCause = th3;
            }
            cleanupObjects(th3);
            throw th2;
        }
    }

    @Override // karate.com.linecorp.armeria.common.stream.AbstractStreamWriter
    final void addObjectOrEvent(Object obj) {
        this.queue.add(obj);
        notifySubscriber();
    }

    final void notifySubscriber() {
        CancellableStreamMessage.SubscriptionImpl subscriptionImpl = this.subscription;
        if (subscriptionImpl == null || this.queue.isEmpty()) {
            return;
        }
        if (subscriptionImpl.needsDirectInvocation()) {
            notifySubscriber0();
        } else {
            subscriptionImpl.executor().execute(this::notifySubscriber0);
        }
    }

    private void notifySubscriber0() {
        if (this.inOnNext) {
            return;
        }
        CancellableStreamMessage.SubscriptionImpl subscriptionImpl = this.subscription;
        if (this.invokedOnSubscribe) {
            while (this.state != AbstractStreamWriter.State.CLEANUP) {
                Object peek = this.queue.peek();
                if (peek == null) {
                    return;
                }
                if (peek instanceof CancellableStreamMessage.CloseEvent) {
                    handleCloseEvent(subscriptionImpl, (CancellableStreamMessage.CloseEvent) this.queue.remove());
                    return;
                } else if (peek instanceof AbstractStreamWriter.AwaitDemandFuture) {
                    notifyAwaitDemandFuture();
                } else if (!notifySubscriberWithElements(subscriptionImpl)) {
                    return;
                }
            }
            cleanupObjects(null);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private boolean notifySubscriberWithElements(CancellableStreamMessage.SubscriptionImpl subscriptionImpl) {
        Subscriber<Object> subscriber = subscriptionImpl.subscriber();
        if (this.demand == 0) {
            return false;
        }
        if (this.demand != Long.MAX_VALUE) {
            this.demand--;
        }
        T remove = this.queue.remove();
        this.inOnNext = true;
        try {
            try {
                remove = prepareObjectForNotification(remove, subscriptionImpl.withPooledObjects());
                subscriber.onNext(remove);
                this.inOnNext = false;
                return true;
            } catch (Throwable th) {
                if (setState(AbstractStreamWriter.State.OPEN, AbstractStreamWriter.State.CLEANUP) || setState(AbstractStreamWriter.State.CLOSED, AbstractStreamWriter.State.CLEANUP)) {
                    notifySubscriberOfCloseEvent(subscriptionImpl, newCloseEvent(th));
                    Exceptions.throwIfFatal(th);
                } else {
                    Exceptions.throwIfFatal(th);
                    logger.warn("Subscriber.onNext({}) should not raise an exception. subscriber: {}", new Object[]{remove, subscriber, th});
                }
                this.inOnNext = false;
                return false;
            }
        } catch (Throwable th2) {
            this.inOnNext = false;
            throw th2;
        }
    }

    private void notifyAwaitDemandFuture() {
        ((CompletableFuture) this.queue.remove()).complete(null);
    }

    private void handleCloseEvent(CancellableStreamMessage.SubscriptionImpl subscriptionImpl, CancellableStreamMessage.CloseEvent closeEvent) {
        if (setState(AbstractStreamWriter.State.CLOSED, AbstractStreamWriter.State.CLEANUP)) {
            notifySubscriberOfCloseEvent(subscriptionImpl, closeEvent);
        }
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamWriter
    public void close() {
        if (setState(AbstractStreamWriter.State.OPEN, AbstractStreamWriter.State.CLOSED)) {
            addObjectOrEvent(SUCCESSFUL_CLOSE);
        }
    }

    @Override // karate.com.linecorp.armeria.common.stream.StreamWriter
    public final void close(Throwable th) {
        Objects.requireNonNull(th, "cause");
        if (th instanceof CancelledSubscriptionException) {
            throw new IllegalArgumentException("cause: " + th + " (must use Subscription.cancel())");
        }
        tryClose(th);
    }

    public final boolean tryClose(Throwable th) {
        if (!setState(AbstractStreamWriter.State.OPEN, AbstractStreamWriter.State.CLOSED)) {
            return false;
        }
        addObjectOrEvent(new CancellableStreamMessage.CloseEvent(th));
        return true;
    }

    private boolean setState(AbstractStreamWriter.State state, AbstractStreamWriter.State state2) {
        if ($assertionsDisabled || state2 != AbstractStreamWriter.State.OPEN) {
            return stateUpdater.compareAndSet(this, state, state2);
        }
        throw new AssertionError("oldState: " + state + ", newState: " + state2);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private void cleanupObjects(@Nullable Throwable th) {
        while (true) {
            Object poll = this.queue.poll();
            if (poll == null) {
                return;
            }
            if (!(poll instanceof CancellableStreamMessage.CloseEvent)) {
                if (poll instanceof CompletableFuture) {
                    if (th == null) {
                        th = ClosedStreamException.get();
                    }
                    ((CompletableFuture) poll).completeExceptionally(th);
                } else {
                    try {
                        onRemoval(poll);
                        karate.com.linecorp.armeria.internal.common.stream.StreamMessageUtil.closeOrAbort(poll, this.cleanupCause);
                    } catch (Throwable th2) {
                        karate.com.linecorp.armeria.internal.common.stream.StreamMessageUtil.closeOrAbort(poll, this.cleanupCause);
                        throw th2;
                    }
                }
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // karate.com.linecorp.armeria.common.stream.AbstractStreamWriter, karate.com.linecorp.armeria.common.stream.StreamWriter
    public /* bridge */ /* synthetic */ boolean tryWrite(Object obj) {
        return super.tryWrite((DefaultStreamMessage<T>) obj);
    }

    static {
        $assertionsDisabled = !DefaultStreamMessage.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(DefaultStreamMessage.class);
        subscriptionUpdater = AtomicReferenceFieldUpdater.newUpdater(DefaultStreamMessage.class, CancellableStreamMessage.SubscriptionImpl.class, "subscription");
        stateUpdater = AtomicReferenceFieldUpdater.newUpdater(DefaultStreamMessage.class, AbstractStreamWriter.State.class, "state");
    }
}
