package com.azure.core.amqp.implementation.handler;

import com.azure.core.amqp.implementation.AmqpLoggingUtils;
import com.azure.core.amqp.implementation.ClientConstants;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LoggingEventBuilder;
import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.message.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/core/amqp/implementation/handler/ReceiverDeliveryHandler.class */
public final class ReceiverDeliveryHandler {
    static final UUID DELIVERY_EMPTY_TAG = new UUID(0, 0);
    private static final int DELIVERY_TAG_SIZE = 16;
    private final AtomicBoolean isTerminated = new AtomicBoolean();
    private final AtomicBoolean isLinkTerminatedWithError = new AtomicBoolean();
    private final Sinks.Many<Message> messages = Sinks.many().multicast().onBackpressureBuffer();
    private final String entityPath;
    private final String receiveLinkName;
    private final DeliverySettleMode settlingMode;
    private final boolean includeDeliveryTagInMessage;
    private final ClientLogger logger;
    private final ReceiverUnsettledDeliveries unsettledDeliveries;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReceiverDeliveryHandler(String str, String str2, DeliverySettleMode deliverySettleMode, ReceiverUnsettledDeliveries receiverUnsettledDeliveries, boolean z, ClientLogger clientLogger) {
        this.entityPath = str;
        this.receiveLinkName = str2;
        this.settlingMode = deliverySettleMode;
        this.unsettledDeliveries = receiverUnsettledDeliveries;
        this.includeDeliveryTagInMessage = z;
        this.logger = clientLogger;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onDelivery(Delivery delivery) {
        if (isPartialOrSettledDelivery(delivery) || isDeliverySettledOnClosedLink(delivery)) {
            return;
        }
        switch (this.settlingMode) {
            case SETTLE_ON_DELIVERY:
                handleSettleOnDelivery(delivery);
                return;
            case ACCEPT_AND_SETTLE_ON_DELIVERY:
                handleAcceptAndSettleOnDelivery(delivery);
                return;
            case SETTLE_VIA_DISPOSITION:
                handleSettleViaDisposition(delivery);
                return;
            default:
                throw this.logger.logExceptionAsError(new RuntimeException("settlingMode is not supported: " + this.settlingMode));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onLinkError() {
        this.isLinkTerminatedWithError.set(true);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Flux<Message> getMessages() {
        return this.messages.asFlux();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void preClose() {
        this.isTerminated.set(true);
    }

    public void close(String str) {
        this.isTerminated.set(true);
        this.messages.emitComplete((signalType, emitResult) -> {
            this.logger.atVerbose().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.LINK_NAME_KEY, this.receiveLinkName).addKeyValue(ClientConstants.EMIT_RESULT_KEY, emitResult).log(str);
            return false;
        });
    }

    private boolean isPartialOrSettledDelivery(Delivery delivery) {
        if (delivery.isPartial()) {
            Link link = delivery.getLink();
            if (link == null) {
                this.logger.atWarning().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.IS_SETTLED_DELIVERY_KEY, true).log("Partial delivery with no link.");
                return true;
            }
            AmqpLoggingUtils.addErrorCondition(this.logger.atVerbose(), link.getRemoteCondition()).addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.LINK_NAME_KEY, this.receiveLinkName).addKeyValue(ClientConstants.UPDATED_LINK_CREDIT_KEY, link.getCredit()).addKeyValue(ClientConstants.REMOTE_CREDIT_KEY, link.getRemoteCredit()).addKeyValue(ClientConstants.IS_PARTIAL_DELIVERY_KEY, true).addKeyValue(ClientConstants.IS_SETTLED_DELIVERY_KEY, delivery.isSettled()).log("onDelivery.");
            return true;
        }
        if (!delivery.isSettled()) {
            return false;
        }
        Link link2 = delivery.getLink();
        if (link2 != null) {
            AmqpLoggingUtils.addErrorCondition(this.logger.atInfo(), link2.getRemoteCondition()).addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.LINK_NAME_KEY, this.receiveLinkName).addKeyValue(ClientConstants.UPDATED_LINK_CREDIT_KEY, link2.getCredit()).addKeyValue(ClientConstants.REMOTE_CREDIT_KEY, link2.getRemoteCredit()).addKeyValue(ClientConstants.IS_SETTLED_DELIVERY_KEY, true).log("onDelivery. Was already settled.");
            return true;
        }
        this.logger.atWarning().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.IS_SETTLED_DELIVERY_KEY, true).log("Settled delivery with no link.");
        return true;
    }

    private boolean isDeliverySettledOnClosedLink(Delivery delivery) {
        Link link = delivery.getLink();
        if (link == null || link.getLocalState() != EndpointState.CLOSED) {
            return false;
        }
        delivery.disposition(new Modified());
        delivery.settle();
        return true;
    }

    private void handleSettleOnDelivery(Delivery delivery) {
        boolean isSettled = delivery.isSettled();
        try {
            Message readAndDecodeTransferDeliveryMessage = readAndDecodeTransferDeliveryMessage(delivery, null);
            delivery.settle();
            logOnDelivery(delivery, null, isSettled);
            emitMessage(readAndDecodeTransferDeliveryMessage, delivery);
        } catch (RuntimeException e) {
            handleDeliveryDecodeError(e);
        }
    }

    private void handleAcceptAndSettleOnDelivery(Delivery delivery) {
        boolean isSettled = delivery.isSettled();
        try {
            Message readAndDecodeTransferDeliveryMessage = readAndDecodeTransferDeliveryMessage(delivery, null);
            delivery.disposition(Accepted.getInstance());
            delivery.settle();
            logOnDelivery(delivery, null, isSettled);
            emitMessage(readAndDecodeTransferDeliveryMessage, delivery);
        } catch (RuntimeException e) {
            handleDeliveryDecodeError(e);
        }
    }

    private void handleSettleViaDisposition(Delivery delivery) {
        boolean isSettled = delivery.isSettled();
        UUID decodeDeliveryTag = decodeDeliveryTag(delivery);
        if (this.unsettledDeliveries.containsDelivery(decodeDeliveryTag)) {
            this.unsettledDeliveries.onDispositionAck(decodeDeliveryTag, delivery);
            return;
        }
        try {
            Message readAndDecodeTransferDeliveryMessage = readAndDecodeTransferDeliveryMessage(delivery, decodeDeliveryTag);
            delivery.getLink().advance();
            if (this.unsettledDeliveries.onDelivery(decodeDeliveryTag, delivery)) {
                logOnDelivery(delivery, decodeDeliveryTag, isSettled);
                emitMessage(readAndDecodeTransferDeliveryMessage, delivery);
            } else {
                delivery.disposition(new Modified());
                delivery.settle();
            }
        } catch (RuntimeException e) {
            handleDeliveryDecodeError(e);
        }
    }

    private Message readAndDecodeTransferDeliveryMessage(Delivery delivery, UUID uuid) {
        int pending = delivery.pending();
        byte[] bArr = new byte[pending];
        int recv = delivery.getLink().recv(bArr, 0, pending);
        Message message = Proton.message();
        message.decode(bArr, 0, recv);
        return this.includeDeliveryTagInMessage ? uuid == null ? new MessageWithDeliveryTag(message, decodeDeliveryTag(delivery)) : new MessageWithDeliveryTag(message, uuid) : message;
    }

    private void handleDeliveryDecodeError(RuntimeException runtimeException) {
        if ((runtimeException instanceof IllegalStateException) && (this.isLinkTerminatedWithError.get() || this.isTerminated.get())) {
            emitError(new IllegalStateException("Cannot decode Delivery when ReactorReceiver instance is closed.", runtimeException));
        } else {
            emitError(new IllegalStateException("Unexpected error when decoding Delivery.", runtimeException));
            throw runtimeException;
        }
    }

    private void emitMessage(Message message, Delivery delivery) {
        this.messages.emitNext(message, (signalType, emitResult) -> {
            this.logger.atWarning().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.LINK_NAME_KEY, this.receiveLinkName).addKeyValue(ClientConstants.EMIT_RESULT_KEY, emitResult).addKeyValue(ClientConstants.DELIVERY_KEY, delivery).log("Could not emit delivery.");
            Link link = delivery.getLink();
            if (emitResult != Sinks.EmitResult.FAIL_OVERFLOW || link.getLocalState() == EndpointState.CLOSED) {
                return false;
            }
            link.setCondition(new ErrorCondition(Symbol.getSymbol("delivery-buffer-overflow"), "Deliveries are not processed fast enough. Closing local link."));
            link.close();
            return true;
        });
    }

    private void emitError(IllegalStateException illegalStateException) {
        this.messages.emitError(illegalStateException, (signalType, emitResult) -> {
            this.logger.atVerbose().addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.LINK_NAME_KEY, this.receiveLinkName).addKeyValue(ClientConstants.EMIT_RESULT_KEY, emitResult).log("Could not emit messages.error.", new Object[]{illegalStateException});
            return false;
        });
    }

    private void logOnDelivery(Delivery delivery, UUID uuid, boolean z) {
        Link link = delivery.getLink();
        if (link == null) {
            return;
        }
        LoggingEventBuilder addKeyValue = AmqpLoggingUtils.addErrorCondition(this.logger.atVerbose(), link.getRemoteCondition()).addKeyValue(ClientConstants.ENTITY_PATH_KEY, this.entityPath).addKeyValue(ClientConstants.LINK_NAME_KEY, this.receiveLinkName);
        if (uuid != null) {
            addKeyValue.addKeyValue(ClientConstants.DELIVERY_TAG_KEY, uuid);
        }
        addKeyValue.addKeyValue(ClientConstants.UPDATED_LINK_CREDIT_KEY, link.getCredit()).addKeyValue(ClientConstants.REMOTE_CREDIT_KEY, link.getRemoteCredit()).addKeyValue(ClientConstants.IS_SETTLED_DELIVERY_KEY, z).log("onDelivery.");
    }

    private static UUID decodeDeliveryTag(Delivery delivery) {
        int i;
        byte[] tag = delivery.getTag();
        if (tag == null || tag.length != DELIVERY_TAG_SIZE) {
            return DELIVERY_EMPTY_TAG;
        }
        byte[] bArr = new byte[DELIVERY_TAG_SIZE];
        for (int i2 = 0; i2 < DELIVERY_TAG_SIZE; i2++) {
            switch (i2) {
                case 0:
                    i = 3;
                    break;
                case 1:
                    i = 2;
                    break;
                case 2:
                    i = 1;
                    break;
                case 3:
                    i = 0;
                    break;
                case 4:
                    i = 5;
                    break;
                case 5:
                    i = 4;
                    break;
                case 6:
                    i = 7;
                    break;
                case 7:
                    i = 6;
                    break;
                default:
                    i = i2;
                    break;
            }
            bArr[i] = tag[i2];
        }
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        return new UUID(wrap.getLong(), wrap.getLong());
    }
}
