package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.amqp.AmqpSession;
import com.azure.core.amqp.implementation.AmqpReceiveLink;
import com.azure.core.amqp.implementation.AmqpSendLink;
import com.azure.core.amqp.implementation.ConnectionOptions;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.ReactorConnection;
import com.azure.core.amqp.implementation.ReactorHandlerProvider;
import com.azure.core.amqp.implementation.ReactorProvider;
import com.azure.core.amqp.implementation.RetryUtil;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.amqp.implementation.handler.SessionHandler;
import com.azure.core.credential.TokenCredential;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.util.HashMap;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Session;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;

/* loaded from: input_file:com/azure/messaging/eventhubs/implementation/EventHubReactorAmqpConnection.class */
public class EventHubReactorAmqpConnection extends ReactorConnection implements EventHubAmqpConnection {
    private static final String MANAGEMENT_SESSION_NAME = "mgmt-session";
    private static final String MANAGEMENT_LINK_NAME = "mgmt";
    private static final String MANAGEMENT_ADDRESS = "$management";
    private final ClientLogger logger;
    private final TokenCredential tokenCredential;
    private final String connectionId;
    private final ReactorProvider reactorProvider;
    private final ReactorHandlerProvider handlerProvider;
    private final TokenManagerProvider tokenManagerProvider;
    private final AmqpRetryOptions retryOptions;
    private final MessageSerializer messageSerializer;
    private final Scheduler scheduler;
    private final String eventHubName;
    private volatile ManagementChannel managementChannel;

    public EventHubReactorAmqpConnection(String str, ConnectionOptions connectionOptions, String str2, ReactorProvider reactorProvider, ReactorHandlerProvider reactorHandlerProvider, TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer) {
        super(str, connectionOptions, reactorProvider, reactorHandlerProvider, tokenManagerProvider, messageSerializer, SenderSettleMode.SETTLED, ReceiverSettleMode.SECOND);
        this.connectionId = str;
        this.reactorProvider = reactorProvider;
        this.handlerProvider = reactorHandlerProvider;
        this.tokenManagerProvider = tokenManagerProvider;
        this.messageSerializer = messageSerializer;
        this.eventHubName = str2;
        this.retryOptions = connectionOptions.getRetry();
        this.tokenCredential = connectionOptions.getTokenCredential();
        this.scheduler = connectionOptions.getScheduler();
        HashMap hashMap = new HashMap(1);
        hashMap.put(ClientConstants.CONNECTION_ID_KEY, str);
        this.logger = new ClientLogger(EventHubReactorAmqpConnection.class, hashMap);
    }

    @Override // com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection
    public Mono<EventHubManagementNode> getManagementNode() {
        return isDisposed() ? Mono.error(this.logger.logExceptionAsError(new IllegalStateException(String.format("connectionId[%s]: Connection is disposed. Cannot get management instance", this.connectionId)))) : getReactorConnection().then(Mono.fromCallable(this::getOrCreateManagementChannel));
    }

    @Override // com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection
    public Mono<AmqpSendLink> createSendLink(String str, String str2, AmqpRetryOptions amqpRetryOptions) {
        return createSession(str2).flatMap(amqpSession -> {
            this.logger.atVerbose().addKeyValue(ClientConstants.ENTITY_PATH_KEY, str2).log("Get or create producer.");
            return amqpSession.createProducer(str, str2, amqpRetryOptions.getTryTimeout(), RetryUtil.getRetryPolicy(amqpRetryOptions)).cast(AmqpSendLink.class);
        });
    }

    @Override // com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection
    public Mono<AmqpReceiveLink> createReceiveLink(String str, String str2, EventPosition eventPosition, ReceiveOptions receiveOptions) {
        return createSession(str2).cast(EventHubSession.class).flatMap(eventHubSession -> {
            this.logger.atVerbose().addKeyValue(ClientConstants.ENTITY_PATH_KEY, str2).log("Get or create consumer.");
            return eventHubSession.createConsumer(str, str2, this.retryOptions.getTryTimeout(), RetryUtil.getRetryPolicy(this.retryOptions), eventPosition, receiveOptions);
        });
    }

    public void dispose() {
        if (isDisposed()) {
            return;
        }
        if (this.managementChannel != null) {
            this.managementChannel.close();
        }
        super.dispose();
    }

    protected AmqpSession createSession(String str, Session session, SessionHandler sessionHandler) {
        return new EventHubReactorSession(this, session, sessionHandler, str, this.reactorProvider, this.handlerProvider, getClaimsBasedSecurityNode(), this.tokenManagerProvider, this.retryOptions, this.messageSerializer);
    }

    private synchronized ManagementChannel getOrCreateManagementChannel() {
        if (this.managementChannel == null) {
            this.managementChannel = new ManagementChannel(createRequestResponseChannel(MANAGEMENT_SESSION_NAME, MANAGEMENT_LINK_NAME, MANAGEMENT_ADDRESS), this.eventHubName, this.tokenCredential, this.tokenManagerProvider, this.messageSerializer, this.scheduler);
        }
        return this.managementChannel;
    }
}
