package io.vertx.amqpbridge.impl;

import io.vertx.amqpbridge.AmqpBridge;
import io.vertx.amqpbridge.AmqpBridgeOptions;
import io.vertx.amqpbridge.AmqpConstants;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.MessageProducer;
import io.vertx.core.json.JsonObject;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.proton.ProtonClient;
import io.vertx.proton.ProtonConnection;
import io.vertx.proton.ProtonDelivery;
import io.vertx.proton.ProtonReceiver;
import io.vertx.proton.impl.ProtonConnectionImpl;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.message.Message;

/* loaded from: input_file:io/vertx/amqpbridge/impl/AmqpBridgeImpl.class */
public class AmqpBridgeImpl implements AmqpBridge {
    private final Vertx vertx;
    private final Context bridgeContext;
    private final AmqpBridgeOptions options;
    private ProtonClient client;
    private ProtonConnection connection;
    private ProtonReceiver replyToConsumer;
    private String replyToConsumerAddress;
    private AmqpProducerImpl replySender;
    private Map<String, Handler<?>> replyToMapping = new ConcurrentHashMap();
    private MessageTranslatorImpl translator = new MessageTranslatorImpl();
    private AtomicBoolean started = new AtomicBoolean();
    private AtomicBoolean closed = new AtomicBoolean();
    private volatile Handler<Void> endHandler;
    private static final Logger LOG = LoggerFactory.getLogger(AmqpBridgeImpl.class);

    public AmqpBridgeImpl(Vertx vertx, AmqpBridgeOptions amqpBridgeOptions) {
        this.vertx = vertx;
        this.options = amqpBridgeOptions;
        this.bridgeContext = vertx.getOrCreateContext();
    }

    @Override // io.vertx.amqpbridge.AmqpBridge
    public void start(String str, int i, Handler<AsyncResult<AmqpBridge>> handler) {
        start(str, i, null, null, handler);
    }

    @Override // io.vertx.amqpbridge.AmqpBridge
    public void start(String str, int i, String str2, String str3, Handler<AsyncResult<AmqpBridge>> handler) {
        runOnContext(true, r13 -> {
            startImpl(str, i, str2, str3, handler);
        });
    }

    private void startImpl(String str, int i, String str2, String str3, Handler<AsyncResult<AmqpBridge>> handler) {
        this.client = ProtonClient.create(this.vertx);
        this.client.connect(this.options, str, i, str2, str3, asyncResult -> {
            if (!asyncResult.succeeded()) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
                return;
            }
            this.connection = (ProtonConnection) asyncResult.result();
            LinkedHashMap linkedHashMap = new LinkedHashMap();
            linkedHashMap.put(BridgeMetaDataSupportImpl.PRODUCT_KEY, BridgeMetaDataSupportImpl.PRODUCT);
            linkedHashMap.put(BridgeMetaDataSupportImpl.VERSION_KEY, BridgeMetaDataSupportImpl.VERSION);
            this.connection.setProperties(linkedHashMap);
            if (this.options.getVhost() != null) {
                this.connection.setHostname(this.options.getVhost());
            }
            if (this.options.getContainerId() != null) {
                this.connection.setContainer(this.options.getContainerId());
            }
            this.connection.disconnectHandler(protonConnection -> {
                endHandlerImpl();
            });
            this.connection.closeHandler(asyncResult -> {
                try {
                    disconnectImpl();
                } finally {
                    endHandlerImpl();
                }
            });
            this.connection.openHandler(asyncResult2 -> {
                LOG.trace("Bridge connection open complete");
                if (!asyncResult2.succeeded()) {
                    handler.handle(Future.failedFuture(asyncResult2.cause()));
                    return;
                }
                if (!this.options.isReplyHandlingSupport() || !this.connection.isAnonymousRelaySupported()) {
                    this.started.set(true);
                    handler.handle(Future.succeededFuture(this));
                    return;
                }
                this.replySender = new AmqpProducerImpl(this, this.connection, null);
                this.replyToConsumer = this.connection.createReceiver((String) null);
                this.replyToConsumer.getSource().setDynamic(true);
                this.replyToConsumer.handler(this::handleIncomingMessageReply);
                this.replyToConsumer.openHandler(asyncResult2 -> {
                    if (!asyncResult2.succeeded()) {
                        handler.handle(Future.failedFuture(asyncResult2.cause()));
                        return;
                    }
                    Source remoteSource = this.replyToConsumer.getRemoteSource();
                    if (remoteSource != null) {
                        this.replyToConsumerAddress = remoteSource.getAddress();
                    }
                    this.started.set(true);
                    handler.handle(Future.succeededFuture(this));
                }).open();
            }).open();
            this.connection.open();
        });
    }

    private void endHandlerImpl() {
        Handler<Void> handler = this.endHandler;
        this.endHandler = null;
        if (handler == null || this.closed.get()) {
            return;
        }
        handler.handle((Object) null);
    }

    @Override // io.vertx.amqpbridge.AmqpBridge
    public MessageConsumer<JsonObject> createConsumer(String str) {
        if (this.started.get()) {
            return new AmqpConsumerImpl(this, this.connection, str);
        }
        throw new IllegalStateException("Bridge was not successfully started");
    }

    @Override // io.vertx.amqpbridge.AmqpBridge
    public MessageProducer<JsonObject> createProducer(String str) {
        if (this.started.get()) {
            return new AmqpProducerImpl(this, this.connection, str);
        }
        throw new IllegalStateException("Bridge was not successfully started");
    }

    @Override // io.vertx.amqpbridge.AmqpBridge
    public void close(Handler<AsyncResult<Void>> handler) {
        this.closed.set(true);
        runOnContext(true, r5 -> {
            shutdownImpl(handler);
        });
    }

    private void shutdownImpl(Handler<AsyncResult<Void>> handler) {
        if (this.connection != null) {
            if (isLocalOpen(this.connection) && isRemoteOpen(this.connection)) {
                this.connection.closeHandler(asyncResult -> {
                    try {
                        disconnectImpl();
                        if (asyncResult.succeeded()) {
                            handler.handle(Future.succeededFuture());
                        } else {
                            handler.handle(Future.failedFuture(asyncResult.cause()));
                        }
                    } catch (Throwable th) {
                        if (asyncResult.succeeded()) {
                            handler.handle(Future.succeededFuture());
                        } else {
                            handler.handle(Future.failedFuture(asyncResult.cause()));
                        }
                        throw th;
                    }
                }).close();
                return;
            }
            try {
                disconnectImpl();
            } finally {
                handler.handle(Future.succeededFuture());
            }
        }
    }

    private void disconnectImpl() {
        ProtonConnection protonConnection = this.connection;
        this.connection = null;
        if (protonConnection != null) {
            try {
                protonConnection.close();
            } finally {
                protonConnection.disconnect();
            }
        }
    }

    private boolean isLocalOpen(ProtonConnection protonConnection) {
        return ((ProtonConnectionImpl) protonConnection).getLocalState() == EndpointState.ACTIVE;
    }

    private boolean isRemoteOpen(ProtonConnection protonConnection) {
        return ((ProtonConnectionImpl) protonConnection).getRemoteState() == EndpointState.ACTIVE;
    }

    @Override // io.vertx.amqpbridge.AmqpBridge
    public void endHandler(Handler<Void> handler) {
        this.endHandler = handler;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <R> void registerReplyToHandler(Message message, Handler<AsyncResult<io.vertx.core.eventbus.Message<R>>> handler) {
        verifyReplyToAddressAvailable();
        message.setReplyTo(this.replyToConsumerAddress);
        String uuid = UUID.randomUUID().toString();
        message.setMessageId(uuid);
        this.replyToMapping.put(uuid, handler);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void verifyReplyToAddressAvailable() throws IllegalStateException {
        if (this.replyToConsumerAddress == null) {
            throw new IllegalStateException("No reply-to address available, unable to send with a reply handler. Try an explicit consumer for replies.");
        }
    }

    private void handleIncomingMessageReply(ProtonDelivery protonDelivery, Message message) {
        Handler<?> remove;
        Object correlationId = message.getCorrelationId();
        if (correlationId == null || (remove = this.replyToMapping.remove(correlationId)) == null) {
            LOG.error("Received message on replyTo consumer, could not match to a replyHandler: " + message);
        } else {
            remove.handle(Future.succeededFuture(new AmqpMessageImpl(this.translator.convertToJsonObject(message), this, message, protonDelivery, this.replyToConsumerAddress, message.getReplyTo())));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <R> void sendReply(Message message, JsonObject jsonObject, Handler<AsyncResult<io.vertx.core.eventbus.Message<R>>> handler) {
        if (this.replySender == null) {
            throw new IllegalStateException("No reply sender available, unable to send implicit replies. Try an explicit producer for replies.");
        }
        String replyTo = message.getReplyTo();
        if (replyTo == null) {
            throw new IllegalStateException("Original message has no reply-to address, unable to send implicit reply");
        }
        Object messageId = message.getMessageId();
        if (messageId != null) {
            JsonObject jsonObject2 = jsonObject.getJsonObject(AmqpConstants.PROPERTIES);
            if (jsonObject2 == null) {
                jsonObject2 = new JsonObject();
                jsonObject.put(AmqpConstants.PROPERTIES, jsonObject2);
            }
            jsonObject2.put(AmqpConstants.PROPERTIES_CORRELATION_ID, messageId);
        }
        this.replySender.doSend(jsonObject, handler, replyTo);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean onContextEventLoop() {
        return this.bridgeContext.nettyEventLoop().inEventLoop();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void runOnContext(boolean z, Handler<Void> handler) {
        if (z && onContextEventLoop()) {
            handler.handle((Object) null);
        } else {
            this.bridgeContext.runOnContext(handler);
        }
    }
}
