/*
 * Decompiled with CFR 0.152.
 */
package io.smallrye.reactive.messaging.kafka.impl;

import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.reactive.messaging.kafka.i18n.KafkaExceptions;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.reactivestreams.Processor;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

class KafkaSenderProcessor
implements Processor<Message<?>, Message<?>>,
Subscription {
    private final long inflights;
    private final boolean waitForCompletion;
    private final Function<Message<?>, Uni<Void>> send;
    private final AtomicReference<Subscription> subscription = new AtomicReference();
    private final AtomicReference<Subscriber<? super Message<?>>> downstream = new AtomicReference();

    public KafkaSenderProcessor(long inflights, boolean waitForCompletion, Function<Message<?>, Uni<Void>> send) {
        this.inflights = inflights;
        this.waitForCompletion = waitForCompletion;
        this.send = send;
    }

    public void subscribe(Subscriber<? super Message<?>> subscriber) {
        if (!this.downstream.compareAndSet(null, subscriber)) {
            Subscriptions.fail(subscriber, (Throwable)KafkaExceptions.ex.illegalStateOnlyOneSubscriber());
        } else if (this.subscription.get() != null) {
            subscriber.onSubscribe((Subscription)this);
        }
    }

    public void onSubscribe(Subscription subscription) {
        if (this.subscription.compareAndSet(null, subscription)) {
            Subscriber<? super Message<?>> subscriber = this.downstream.get();
            if (subscriber != null) {
                subscriber.onSubscribe((Subscription)this);
            }
        } else {
            Subscriber<? super Message<?>> subscriber = this.downstream.get();
            if (subscriber != null) {
                subscriber.onSubscribe((Subscription)Subscriptions.CANCELLED);
            }
        }
    }

    public void onNext(Message<?> message) {
        if (this.waitForCompletion) {
            this.send.apply(message).subscribe().with(x -> this.requestNext(message), this::onError);
        } else {
            this.send.apply(message).subscribe().with(x -> {}, this::onError);
            this.requestNext(message);
        }
    }

    public void request(long l) {
        if (l != Long.MAX_VALUE) {
            throw KafkaExceptions.ex.illegalStateConsumeWithoutBackPressure();
        }
        this.subscription.get().request(this.inflights);
    }

    public void cancel() {
        Subscription s = this.subscription.getAndSet((Subscription)Subscriptions.CANCELLED);
        if (s != null) {
            s.cancel();
        }
    }

    private void requestNext(Message<?> message) {
        Subscription up;
        Subscriber<? super Message<?>> down = this.downstream.get();
        if (down != null) {
            down.onNext(message);
        }
        if ((up = this.subscription.get()) != null && this.inflights != Long.MAX_VALUE) {
            up.request(1L);
        }
    }

    public void onError(Throwable throwable) {
        Subscriber subscriber = this.downstream.getAndSet(null);
        if (subscriber != null) {
            subscriber.onError(throwable);
        }
    }

    public void onComplete() {
        Subscriber subscriber = this.downstream.getAndSet(null);
        if (subscriber != null) {
            subscriber.onComplete();
        }
    }
}

