package akka.remote.artery.aeron;

import akka.event.LoggingAdapter;
import akka.remote.artery.EnvelopeBuffer;
import akka.remote.artery.aeron.AeronSource;
import akka.remote.artery.aeron.TaskRunner;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import io.aeron.Subscription;
import io.aeron.exceptions.DriverTimeoutException;
import org.agrona.hints.ThreadHints;
import scala.MatchError;
import scala.Option;
import scala.collection.immutable.$colon;
import scala.collection.immutable.List;
import scala.collection.immutable.Nil$;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.package$;
import scala.runtime.BoxesRunTime;
import scala.util.control.NonFatal$;

/* compiled from: AeronSource.scala */
/* loaded from: input_file:akka/remote/artery/aeron/AeronSource$$anon$1.class */
public final class AeronSource$$anon$1 extends GraphStageLogic implements OutHandler, AeronSource.AeronLifecycle, StageLogging {
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;
    private final Subscription subscription;
    private int backoffCount;
    private long delegateTaskStartTime;
    private long countBeforeDelegate;
    private final AeronSource.MessageHandler messageHandler;
    private final TaskRunner.Add addPollTask;
    private boolean delegatingToTaskRunner;
    private List pendingUnavailableImages;
    private final AsyncCallback onUnavailableImageCb;
    private final AsyncCallback getStatusCb;
    private final /* synthetic */ AeronSource $outer;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public AeronSource$$anon$1(AeronSource aeronSource) {
        super(aeronSource.akka$remote$artery$aeron$AeronSource$$_$_$$anon$superArg$1$1());
        if (aeronSource == null) {
            throw new NullPointerException();
        }
        this.$outer = aeronSource;
        StageLogging.$init$(this);
        this.subscription = aeronSource.akka$remote$artery$aeron$AeronSource$$aeron.addSubscription(aeronSource.akka$remote$artery$aeron$AeronSource$$channel, aeronSource.akka$remote$artery$aeron$AeronSource$$streamId);
        this.backoffCount = aeronSource.akka$remote$artery$aeron$AeronSource$$spinning;
        this.delegateTaskStartTime = 0L;
        this.countBeforeDelegate = 0L;
        this.messageHandler = new AeronSource.MessageHandler(aeronSource.akka$remote$artery$aeron$AeronSource$$pool);
        this.addPollTask = TaskRunner$Add$.MODULE$.apply(AeronSource$.MODULE$.akka$remote$artery$aeron$AeronSource$$$pollTask(this.subscription, this.messageHandler, getAsyncCallback(envelopeBuffer -> {
            taskOnMessage(envelopeBuffer);
        })));
        this.delegatingToTaskRunner = false;
        this.pendingUnavailableImages = package$.MODULE$.Nil();
        this.onUnavailableImageCb = getAsyncCallback(i -> {
            this.pendingUnavailableImages = this.pendingUnavailableImages.$colon$colon(BoxesRunTime.boxToInteger(i));
            freeSessionBuffers();
        });
        this.getStatusCb = getAsyncCallback(promise -> {
            promise.success(BoxesRunTime.boxToLong(this.subscription.channelStatus()));
        });
        setHandler(aeronSource.out(), this);
    }

    public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    public /* bridge */ /* synthetic */ void onDownstreamFinish(Throwable th) throws Exception {
        OutHandler.onDownstreamFinish$(this, th);
    }

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public Class logSource() {
        return AeronSource.class;
    }

    public void preStart() {
        this.$outer.akka$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceStarted(this.$outer.akka$remote$artery$aeron$AeronSource$$channel, this.$outer.akka$remote$artery$aeron$AeronSource$$streamId);
    }

    public void postStop() {
        this.$outer.akka$remote$artery$aeron$AeronSource$$taskRunner.command(TaskRunner$Remove$.MODULE$.apply(this.addPollTask.task()));
        try {
            try {
                this.subscription.close();
            } catch (DriverTimeoutException e) {
                log().debug("DriverTimeout when closing subscription. {}", e);
            }
        } finally {
            this.$outer.akka$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceStopped(this.$outer.akka$remote$artery$aeron$AeronSource$$channel, this.$outer.akka$remote$artery$aeron$AeronSource$$streamId);
        }
    }

    public void onPull() {
        this.backoffCount = this.$outer.akka$remote$artery$aeron$AeronSource$$spinning;
        subscriberLoop();
    }

    /* JADX WARN: Unreachable blocks removed: 3, instructions: 3 */
    private void subscriberLoop() {
        AeronSource$$anon$1 aeronSource$$anon$1 = this;
        while (true) {
            AeronSource$$anon$1 aeronSource$$anon$12 = aeronSource$$anon$1;
            aeronSource$$anon$12.messageHandler.reset();
            int poll = aeronSource$$anon$12.subscription.poll(aeronSource$$anon$12.messageHandler.fragmentsHandler(), 1);
            EnvelopeBuffer messageReceived = aeronSource$$anon$12.messageHandler.messageReceived();
            aeronSource$$anon$12.messageHandler.reset();
            if (poll > 0) {
                aeronSource$$anon$12.countBeforeDelegate++;
                if (messageReceived != null) {
                    aeronSource$$anon$12.onMessage(messageReceived);
                    return;
                }
                aeronSource$$anon$1 = aeronSource$$anon$12;
            } else {
                aeronSource$$anon$12.backoffCount--;
                if (aeronSource$$anon$12.backoffCount <= 0) {
                    aeronSource$$anon$12.$outer.akka$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceDelegateToTaskRunner(aeronSource$$anon$12.countBeforeDelegate);
                    aeronSource$$anon$12.delegatingToTaskRunner = true;
                    aeronSource$$anon$12.delegateTaskStartTime = System.nanoTime();
                    aeronSource$$anon$12.$outer.akka$remote$artery$aeron$AeronSource$$taskRunner.command(aeronSource$$anon$12.addPollTask);
                    return;
                }
                ThreadHints.onSpinWait();
                aeronSource$$anon$1 = aeronSource$$anon$12;
            }
        }
    }

    @Override // akka.remote.artery.aeron.AeronSource.AeronLifecycle
    public Future channelEndpointStatus() {
        Promise apply = Promise$.MODULE$.apply();
        this.getStatusCb.invoke(apply);
        return apply.future();
    }

    private void taskOnMessage(EnvelopeBuffer envelopeBuffer) {
        this.countBeforeDelegate = 0L;
        this.delegatingToTaskRunner = false;
        this.$outer.akka$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceReturnFromTaskRunner(System.nanoTime() - this.delegateTaskStartTime);
        freeSessionBuffers();
        onMessage(envelopeBuffer);
    }

    private void onMessage(EnvelopeBuffer envelopeBuffer) {
        this.$outer.akka$remote$artery$aeron$AeronSource$$flightRecorder.aeronSourceReceived(envelopeBuffer.byteBuffer().limit());
        push(this.$outer.out(), envelopeBuffer);
    }

    private void freeSessionBuffers() {
        if (this.delegatingToTaskRunner) {
            return;
        }
        loop$1(this.pendingUnavailableImages);
        this.pendingUnavailableImages = package$.MODULE$.Nil();
    }

    @Override // akka.remote.artery.aeron.AeronSource.AeronLifecycle
    public void onUnavailableImage(int i) {
        try {
            this.onUnavailableImageCb.invoke(BoxesRunTime.boxToInteger(i));
        } catch (Throwable th) {
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    return;
                }
            }
            throw th;
        }
    }

    /* JADX WARN: Unreachable blocks removed: 3, instructions: 3 */
    private final void loop$1(List list) {
        List list2 = list;
        while (true) {
            List list3 = list2;
            Nil$ Nil = package$.MODULE$.Nil();
            if (Nil == null) {
                if (list3 == null) {
                    return;
                }
            } else if (Nil.equals(list3)) {
                return;
            }
            if (!(list3 instanceof $colon.colon)) {
                throw new MatchError(list3);
            }
            $colon.colon colonVar = ($colon.colon) list3;
            List next$access$1 = colonVar.next$access$1();
            this.messageHandler.fragmentsHandler().freeSessionBuffer(BoxesRunTime.unboxToInt(colonVar.head()));
            list2 = next$access$1;
        }
    }
}
