package akka.remote.artery;

import akka.actor.Address;
import akka.dispatch.ExecutionContexts$;
import akka.dispatch.sysmsg.SystemMessage;
import akka.event.Logging$;
import akka.event.LoggingAdapter;
import akka.remote.UniqueAddress;
import akka.remote.artery.InboundControlJunction;
import akka.remote.artery.OutboundHandshake;
import akka.remote.artery.SystemMessageDelivery;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import akka.stream.stage.TimerGraphStageLogic;
import akka.util.PrettyDuration$;
import akka.util.PrettyDuration$PrettyPrintableDuration$;
import java.util.ArrayDeque;
import java.util.Iterator;
import scala.MatchError;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: SystemMessageDelivery.scala */
/* loaded from: input_file:akka/remote/artery/SystemMessageDelivery$$anon$1.class */
public final class SystemMessageDelivery$$anon$1 extends TimerGraphStageLogic implements InHandler, OutHandler, InboundControlJunction.ControlMessageObserver, StageLogging {
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;
    private boolean replyObserverAttached;
    private long seqNo;
    private int incarnation;
    private final ArrayDeque unacknowledged;
    private ArrayDeque resending;
    private boolean stopping;
    private final long giveUpAfterNanos;
    private long ackTimestamp;
    private final AsyncCallback ackCallback;
    private final AsyncCallback nackCallback;
    private final /* synthetic */ SystemMessageDelivery $outer;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SystemMessageDelivery$$anon$1(SystemMessageDelivery systemMessageDelivery) {
        super(systemMessageDelivery.akka$remote$artery$SystemMessageDelivery$$_$$anon$superArg$1$1());
        if (systemMessageDelivery == null) {
            throw new NullPointerException();
        }
        this.$outer = systemMessageDelivery;
        StageLogging.$init$(this);
        this.replyObserverAttached = false;
        this.seqNo = 0L;
        this.incarnation = systemMessageDelivery.akka$remote$artery$SystemMessageDelivery$$outboundContext.associationState().incarnation();
        this.unacknowledged = new ArrayDeque();
        this.resending = new ArrayDeque();
        this.stopping = false;
        this.giveUpAfterNanos = systemMessageDelivery.akka$remote$artery$SystemMessageDelivery$$outboundContext.settings().Advanced().GiveUpSystemMessageAfter().toNanos();
        this.ackTimestamp = System.nanoTime();
        this.ackCallback = getAsyncCallback(ack -> {
            ack(ack.seqNo());
        });
        this.nackCallback = getAsyncCallback(nack -> {
            if (nack.seqNo() <= this.seqNo) {
                ack(nack.seqNo());
                log().warning("Received negative acknowledgement of system message from [{}], highest acknowledged [{}]", systemMessageDelivery.akka$remote$artery$SystemMessageDelivery$$outboundContext.remoteAddress(), BoxesRunTime.boxToLong(nack.seqNo()));
            }
        });
        setHandlers(systemMessageDelivery.in(), systemMessageDelivery.out(), this);
    }

    public /* bridge */ /* synthetic */ void onUpstreamFailure(Throwable th) throws Exception {
        InHandler.onUpstreamFailure$(this, th);
    }

    public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    public /* bridge */ /* synthetic */ void onDownstreamFinish(Throwable th) throws Exception {
        OutHandler.onDownstreamFinish$(this, th);
    }

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    private UniqueAddress localAddress() {
        return this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.localAddress();
    }

    private Address remoteAddress() {
        return this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.remoteAddress();
    }

    private String remoteAddressLogParam() {
        return this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.associationState().uniqueRemoteAddress().getOrElse(this::remoteAddressLogParam$$anonfun$1).toString();
    }

    public Class logSource() {
        return SystemMessageDelivery.class;
    }

    public void preStart() {
        AsyncCallback asyncCallback = getAsyncCallback(done -> {
            this.replyObserverAttached = true;
            if (isAvailable(this.$outer.out())) {
                pull(this.$outer.in());
            }
        });
        this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.controlSubject().attach(this).foreach((v1) -> {
            SystemMessageDelivery.akka$remote$artery$SystemMessageDelivery$$anon$1$$_$preStart$$anonfun$1(r1, v1);
        }, ExecutionContexts$.MODULE$.parasitic());
    }

    public void postStop() {
        int size = this.unacknowledged.size();
        sendUnacknowledgedToDeadLetters();
        this.unacknowledged.clear();
        if (size > 0) {
            this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.quarantine(new StringBuilder(62).append("SystemMessageDelivery stopped with [").append(size).append("] pending system messages.").toString());
        }
        this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.controlSubject().detach(this);
    }

    public void onUpstreamFinish() {
        if (this.unacknowledged.isEmpty()) {
            InHandler.onUpstreamFinish$(this);
        } else {
            this.stopping = true;
        }
    }

    /* JADX WARN: Unreachable blocks removed: 1, instructions: 1 */
    public void onTimer(Object obj) {
        if (!SystemMessageDelivery$ResendTick$.MODULE$.equals(obj)) {
            throw new IllegalArgumentException(new StringBuilder(19).append("Unknown timer key: ").append(obj).toString());
        }
        checkGiveUp();
        if (this.resending.isEmpty() && !this.unacknowledged.isEmpty()) {
            this.resending = this.unacknowledged.clone();
            tryResend();
        }
        if (this.unacknowledged.isEmpty()) {
            return;
        }
        scheduleOnce(SystemMessageDelivery$ResendTick$.MODULE$, this.$outer.akka$remote$artery$SystemMessageDelivery$$resendInterval);
    }

    @Override // akka.remote.artery.InboundControlJunction.ControlMessageObserver
    public void notify(InboundEnvelope inboundEnvelope) {
        Object message = inboundEnvelope.message();
        if (message instanceof SystemMessageDelivery.Ack) {
            SystemMessageDelivery.Ack ack = (SystemMessageDelivery.Ack) message;
            Address address = ack.from().address();
            Address remoteAddress = remoteAddress();
            if (address == null) {
                if (remoteAddress != null) {
                    return;
                }
            } else if (!address.equals(remoteAddress)) {
                return;
            }
            this.ackCallback.invoke(ack);
            return;
        }
        if (message instanceof SystemMessageDelivery.Nack) {
            SystemMessageDelivery.Nack nack = (SystemMessageDelivery.Nack) message;
            Address address2 = nack.from().address();
            Address remoteAddress2 = remoteAddress();
            if (address2 == null) {
                if (remoteAddress2 != null) {
                    return;
                }
            } else if (!address2.equals(remoteAddress2)) {
                return;
            }
            this.nackCallback.invoke(nack);
        }
    }

    @Override // akka.remote.artery.InboundControlJunction.ControlMessageObserver
    public void controlSubjectCompleted(Try r4) {
        getAsyncCallback(r5 -> {
            if (r5 instanceof Success) {
                completeStage();
            } else {
                if (!(r5 instanceof Failure)) {
                    throw new MatchError(r5);
                }
                failStage(((Failure) r5).exception());
            }
        }).invoke(r4);
    }

    private void ack(long j) {
        this.ackTimestamp = System.nanoTime();
        if (j <= this.seqNo) {
            clearUnacknowledged(j);
        }
    }

    /* JADX WARN: Unreachable blocks removed: 2, instructions: 2 */
    private void clearUnacknowledged(long j) {
        SystemMessageDelivery$$anon$1 systemMessageDelivery$$anon$1 = this;
        while (true) {
            SystemMessageDelivery$$anon$1 systemMessageDelivery$$anon$12 = systemMessageDelivery$$anon$1;
            if (systemMessageDelivery$$anon$12.unacknowledged.isEmpty() || ((SystemMessageDelivery.SystemMessageEnvelope) ((OutboundEnvelope) systemMessageDelivery$$anon$12.unacknowledged.peek()).message()).seqNo() > j) {
                return;
            }
            systemMessageDelivery$$anon$12.unacknowledged.removeFirst();
            if (systemMessageDelivery$$anon$12.unacknowledged.isEmpty()) {
                systemMessageDelivery$$anon$12.cancelTimer(systemMessageDelivery$$anon$12.$outer.akka$remote$artery$SystemMessageDelivery$$resendInterval);
            }
            if (systemMessageDelivery$$anon$12.stopping && systemMessageDelivery$$anon$12.unacknowledged.isEmpty()) {
                systemMessageDelivery$$anon$12.completeStage();
                return;
            }
            systemMessageDelivery$$anon$1 = systemMessageDelivery$$anon$12;
        }
    }

    private void tryResend() {
        if (!isAvailable(this.$outer.out()) || this.resending.isEmpty()) {
            return;
        }
        OutboundEnvelope outboundEnvelope = (OutboundEnvelope) this.resending.poll();
        if (log().isDebugEnabled()) {
            Object message = outboundEnvelope.message();
            if (message instanceof SystemMessageDelivery.SystemMessageEnvelope) {
                SystemMessageDelivery.SystemMessageEnvelope unapply = SystemMessageDelivery$SystemMessageEnvelope$.MODULE$.unapply((SystemMessageDelivery.SystemMessageEnvelope) message);
                Object _1 = unapply._1();
                long _2 = unapply._2();
                unapply._3();
                log().debug("Resending system message [{}] [{}]", Logging$.MODULE$.simpleName(_1), BoxesRunTime.boxToLong(_2));
            } else {
                log().debug("Resending control message [{}]", Logging$.MODULE$.simpleName(outboundEnvelope.message()));
            }
        }
        if (this.incarnation != this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.associationState().incarnation()) {
            log().debug("Noticed new incarnation of [{}] from tryResend, clear state", remoteAddressLogParam());
            clear();
        }
        pushCopy(outboundEnvelope);
    }

    private void pushCopy(OutboundEnvelope outboundEnvelope) {
        push(this.$outer.out(), outboundEnvelope.copy());
    }

    public void onPush() {
        OutboundEnvelope outboundEnvelope = (OutboundEnvelope) grab(this.$outer.in());
        Object message = outboundEnvelope.message();
        if (!(message instanceof SystemMessage) && !(message instanceof SystemMessageDelivery.AckedDeliveryMessage)) {
            if (message instanceof OutboundHandshake.HandshakeReq) {
                if (isAvailable(this.$outer.out())) {
                    pushCopy(outboundEnvelope);
                    return;
                }
                return;
            } else {
                if (message instanceof SystemMessageDelivery.ClearSystemMessageDelivery) {
                    if (SystemMessageDelivery$ClearSystemMessageDelivery$.MODULE$.unapply((SystemMessageDelivery.ClearSystemMessageDelivery) message)._1() <= this.incarnation) {
                        log().debug("Clear system message delivery of [{}]", remoteAddressLogParam());
                        clear();
                    }
                    pull(this.$outer.in());
                    return;
                }
                if (this.resending.isEmpty() && isAvailable(this.$outer.out())) {
                    push(this.$outer.out(), outboundEnvelope);
                    return;
                } else {
                    this.resending.offer(outboundEnvelope);
                    tryResend();
                    return;
                }
            }
        }
        if (this.unacknowledged.size() >= this.$outer.akka$remote$artery$SystemMessageDelivery$$maxBufferSize) {
            this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.quarantine(new StringBuilder(48).append("System message delivery buffer overflow, size [").append(this.$outer.akka$remote$artery$SystemMessageDelivery$$maxBufferSize).append("]").toString());
            this.$outer.akka$remote$artery$SystemMessageDelivery$$deadLetters.$bang(outboundEnvelope, this.$outer.akka$remote$artery$SystemMessageDelivery$$deadLetters.$bang$default$2(outboundEnvelope));
            pull(this.$outer.in());
            return;
        }
        if (this.seqNo == 0) {
            this.incarnation = this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.associationState().incarnation();
        } else if (this.incarnation != this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.associationState().incarnation()) {
            log().debug("Noticed new incarnation of [{}] from onPush, clear state", remoteAddressLogParam());
            clear();
        }
        this.seqNo++;
        if (this.unacknowledged.isEmpty()) {
            this.ackTimestamp = System.nanoTime();
        } else {
            checkGiveUp();
        }
        OutboundEnvelope withMessage = outboundEnvelope.withMessage(SystemMessageDelivery$SystemMessageEnvelope$.MODULE$.apply(message, this.seqNo, localAddress()));
        this.unacknowledged.offer(withMessage);
        scheduleOnce(SystemMessageDelivery$ResendTick$.MODULE$, this.$outer.akka$remote$artery$SystemMessageDelivery$$resendInterval);
        if (this.resending.isEmpty() && isAvailable(this.$outer.out())) {
            pushCopy(withMessage);
        } else {
            this.resending.offer(withMessage);
            tryResend();
        }
    }

    private void checkGiveUp() {
        if (!this.unacknowledged.isEmpty() && System.nanoTime() - this.ackTimestamp > this.giveUpAfterNanos) {
            throw new SystemMessageDelivery.GaveUpSystemMessageException(new StringBuilder(43).append("Gave up sending system message to [").append(this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.remoteAddress()).append("] after ").append(new StringBuilder(1).append(PrettyDuration$PrettyPrintableDuration$.MODULE$.pretty$extension(PrettyDuration$.MODULE$.PrettyPrintableDuration(this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.settings().Advanced().GiveUpSystemMessageAfter()))).append(".").toString()).toString());
        }
    }

    private void clear() {
        sendUnacknowledgedToDeadLetters();
        this.seqNo = 0L;
        this.incarnation = this.$outer.akka$remote$artery$SystemMessageDelivery$$outboundContext.associationState().incarnation();
        this.unacknowledged.clear();
        this.resending.clear();
        cancelTimer(this.$outer.akka$remote$artery$SystemMessageDelivery$$resendInterval);
    }

    private void sendUnacknowledgedToDeadLetters() {
        Iterator it = this.unacknowledged.iterator();
        while (it.hasNext()) {
            OutboundEnvelope outboundEnvelope = (OutboundEnvelope) it.next();
            this.$outer.akka$remote$artery$SystemMessageDelivery$$deadLetters.$bang(outboundEnvelope, this.$outer.akka$remote$artery$SystemMessageDelivery$$deadLetters.$bang$default$2(outboundEnvelope));
        }
    }

    public void onPull() {
        if (this.replyObserverAttached) {
            if (!this.resending.isEmpty() || hasBeenPulled(this.$outer.in()) || this.stopping) {
                tryResend();
            } else {
                pull(this.$outer.in());
            }
        }
    }

    private final Address remoteAddressLogParam$$anonfun$1() {
        return remoteAddress();
    }
}
