package com.azure.spring.cloud.stream.binder.eventhubs.implementation;

import com.azure.messaging.eventhubs.CheckpointStore;
import com.azure.spring.cloud.core.implementation.util.AzurePropertiesUtils;
import com.azure.spring.cloud.core.properties.AzureProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.config.EventHubsProcessorFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.eventhubs.config.EventHubsProducerFactoryCustomizer;
import com.azure.spring.cloud.stream.binder.eventhubs.core.implementation.provisioning.EventHubsChannelProvisioner;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsConsumerProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsExtendedBindingProperties;
import com.azure.spring.cloud.stream.binder.eventhubs.core.properties.EventHubsProducerProperties;
import com.azure.spring.integration.core.handler.DefaultMessageHandler;
import com.azure.spring.integration.core.implementation.instrumentation.DefaultInstrumentation;
import com.azure.spring.integration.core.implementation.instrumentation.DefaultInstrumentationManager;
import com.azure.spring.integration.core.implementation.instrumentation.InstrumentationSendCallback;
import com.azure.spring.integration.core.instrumentation.Instrumentation;
import com.azure.spring.integration.core.instrumentation.InstrumentationManager;
import com.azure.spring.integration.eventhubs.implementation.health.EventHubsProcessorInstrumentation;
import com.azure.spring.integration.eventhubs.inbound.EventHubsInboundChannelAdapter;
import com.azure.spring.messaging.ListenerMode;
import com.azure.spring.messaging.PropertiesSupplier;
import com.azure.spring.messaging.eventhubs.core.DefaultEventHubsNamespaceProcessorFactory;
import com.azure.spring.messaging.eventhubs.core.DefaultEventHubsNamespaceProducerFactory;
import com.azure.spring.messaging.eventhubs.core.EventHubsProcessorFactory;
import com.azure.spring.messaging.eventhubs.core.EventHubsTemplate;
import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
import com.azure.spring.messaging.eventhubs.core.properties.EventHubsContainerProperties;
import com.azure.spring.messaging.eventhubs.core.properties.NamespaceProperties;
import com.azure.spring.messaging.eventhubs.core.properties.ProcessorProperties;
import com.azure.spring.messaging.eventhubs.core.properties.ProducerProperties;
import com.azure.spring.messaging.eventhubs.implementation.properties.merger.ProcessorPropertiesMerger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.BinderSpecificPropertiesProvider;
import org.springframework.cloud.stream.binder.ExtendedConsumerProperties;
import org.springframework.cloud.stream.binder.ExtendedProducerProperties;
import org.springframework.cloud.stream.binder.ExtendedPropertiesBinder;
import org.springframework.cloud.stream.provisioning.ConsumerDestination;
import org.springframework.cloud.stream.provisioning.ProducerDestination;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

/* loaded from: input_file:com/azure/spring/cloud/stream/binder/eventhubs/implementation/EventHubsMessageChannelBinder.class */
public class EventHubsMessageChannelBinder extends AbstractMessageChannelBinder<ExtendedConsumerProperties<EventHubsConsumerProperties>, ExtendedProducerProperties<EventHubsProducerProperties>, EventHubsChannelProvisioner> implements ExtendedPropertiesBinder<MessageChannel, EventHubsConsumerProperties, EventHubsProducerProperties> {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsMessageChannelBinder.class);
    private static final ExpressionParser EXPRESSION_PARSER = new SpelExpressionParser();
    private NamespaceProperties namespaceProperties;
    private EventHubsTemplate eventHubsTemplate;
    private CheckpointStore checkpointStore;
    private DefaultEventHubsNamespaceProcessorFactory processorFactory;
    private final List<EventHubsMessageListenerContainer> eventHubsMessageListenerContainers;
    private final InstrumentationManager instrumentationManager;
    private EventHubsExtendedBindingProperties bindingProperties;
    private final Map<String, ExtendedProducerProperties<EventHubsProducerProperties>> extendedProducerPropertiesMap;
    private final List<EventHubsProducerFactoryCustomizer> producerFactoryCustomizers;
    private final List<EventHubsProcessorFactoryCustomizer> processorFactoryCustomizers;

    public EventHubsMessageChannelBinder(String[] strArr, EventHubsChannelProvisioner eventHubsChannelProvisioner) {
        super(strArr, eventHubsChannelProvisioner);
        this.eventHubsMessageListenerContainers = new ArrayList();
        this.instrumentationManager = new DefaultInstrumentationManager();
        this.bindingProperties = new EventHubsExtendedBindingProperties();
        this.extendedProducerPropertiesMap = new ConcurrentHashMap();
        this.producerFactoryCustomizers = new ArrayList();
        this.processorFactoryCustomizers = new ArrayList();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageHandler createProducerMessageHandler(ProducerDestination producerDestination, ExtendedProducerProperties<EventHubsProducerProperties> extendedProducerProperties, MessageChannel messageChannel) {
        this.extendedProducerPropertiesMap.put(producerDestination.getName(), extendedProducerProperties);
        Assert.notNull(getEventHubTemplate(), "eventHubsTemplate can't be null when create a producer");
        DefaultMessageHandler defaultMessageHandler = new DefaultMessageHandler(producerDestination.getName(), this.eventHubsTemplate);
        defaultMessageHandler.setBeanFactory(getBeanFactory());
        defaultMessageHandler.setSync(((EventHubsProducerProperties) extendedProducerProperties.getExtension()).isSync());
        defaultMessageHandler.setSendTimeout(((EventHubsProducerProperties) extendedProducerProperties.getExtension()).getSendTimeout().toMillis());
        defaultMessageHandler.setSendFailureChannel(messageChannel);
        defaultMessageHandler.setSendCallback(new InstrumentationSendCallback(Instrumentation.buildId(Instrumentation.Type.PRODUCER, producerDestination.getName()), this.instrumentationManager));
        if (extendedProducerProperties.isPartitioned()) {
            defaultMessageHandler.setPartitionIdExpression(EXPRESSION_PARSER.parseExpression("headers['scst_partition']"));
        }
        return defaultMessageHandler;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public MessageProducer createConsumerEndpoint(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<EventHubsConsumerProperties> extendedConsumerProperties) {
        Assert.notNull(getProcessorFactory(), "processor factory can't be null when create a consumer");
        if (!StringUtils.hasText(str)) {
            str = "anonymous." + UUID.randomUUID();
        }
        EventHubsMessageListenerContainer eventHubsMessageListenerContainer = new EventHubsMessageListenerContainer(getProcessorFactory(), createContainerProperties(consumerDestination, str, extendedConsumerProperties));
        this.eventHubsMessageListenerContainers.add(eventHubsMessageListenerContainer);
        EventHubsInboundChannelAdapter eventHubsInboundChannelAdapter = extendedConsumerProperties.isBatchMode() ? new EventHubsInboundChannelAdapter(eventHubsMessageListenerContainer, ListenerMode.BATCH) : new EventHubsInboundChannelAdapter(eventHubsMessageListenerContainer);
        eventHubsInboundChannelAdapter.setBeanFactory(getBeanFactory());
        String buildId = Instrumentation.buildId(Instrumentation.Type.CONSUMER, consumerDestination.getName() + "/" + str);
        eventHubsInboundChannelAdapter.setInstrumentationManager(this.instrumentationManager);
        eventHubsInboundChannelAdapter.setInstrumentationId(buildId);
        eventHubsInboundChannelAdapter.setErrorChannel(registerErrorInfrastructure(consumerDestination, str, extendedConsumerProperties).getErrorChannel());
        return eventHubsInboundChannelAdapter;
    }

    private EventHubsContainerProperties createContainerProperties(ConsumerDestination consumerDestination, String str, ExtendedConsumerProperties<EventHubsConsumerProperties> extendedConsumerProperties) {
        EventHubsContainerProperties eventHubsContainerProperties = new EventHubsContainerProperties();
        AzurePropertiesUtils.copyAzureCommonProperties((AzureProperties) extendedConsumerProperties.getExtension(), eventHubsContainerProperties);
        ProcessorPropertiesMerger.copyProcessorPropertiesIfNotNull((ProcessorProperties) extendedConsumerProperties.getExtension(), eventHubsContainerProperties);
        eventHubsContainerProperties.setEventHubName(consumerDestination.getName());
        eventHubsContainerProperties.setConsumerGroup(str);
        eventHubsContainerProperties.setCheckpointConfig(((EventHubsConsumerProperties) extendedConsumerProperties.getExtension()).getCheckpoint());
        return eventHubsContainerProperties;
    }

    /* renamed from: getExtendedConsumerProperties, reason: merged with bridge method [inline-methods] */
    public EventHubsConsumerProperties m2getExtendedConsumerProperties(String str) {
        return (EventHubsConsumerProperties) this.bindingProperties.getExtendedConsumerProperties(str);
    }

    /* renamed from: getExtendedProducerProperties, reason: merged with bridge method [inline-methods] */
    public EventHubsProducerProperties m1getExtendedProducerProperties(String str) {
        return (EventHubsProducerProperties) this.bindingProperties.getExtendedProducerProperties(str);
    }

    public String getDefaultsPrefix() {
        return this.bindingProperties.getDefaultsPrefix();
    }

    public Class<? extends BinderSpecificPropertiesProvider> getExtendedPropertiesEntryClass() {
        return this.bindingProperties.getExtendedPropertiesEntryClass();
    }

    public void setBindingProperties(EventHubsExtendedBindingProperties eventHubsExtendedBindingProperties) {
        this.bindingProperties = eventHubsExtendedBindingProperties;
    }

    private PropertiesSupplier<String, ProducerProperties> getProducerPropertiesSupplier() {
        return str -> {
            if (!this.extendedProducerPropertiesMap.containsKey(str)) {
                LOGGER.debug("Can't find extended properties for {}", str);
                return null;
            }
            EventHubsProducerProperties eventHubsProducerProperties = (EventHubsProducerProperties) this.extendedProducerPropertiesMap.get(str).getExtension();
            eventHubsProducerProperties.setEventHubName(str);
            return eventHubsProducerProperties;
        };
    }

    private EventHubsTemplate getEventHubTemplate() {
        if (this.eventHubsTemplate == null) {
            DefaultEventHubsNamespaceProducerFactory defaultEventHubsNamespaceProducerFactory = new DefaultEventHubsNamespaceProducerFactory(this.namespaceProperties, getProducerPropertiesSupplier());
            this.producerFactoryCustomizers.forEach(eventHubsProducerFactoryCustomizer -> {
                eventHubsProducerFactoryCustomizer.customize(defaultEventHubsNamespaceProducerFactory);
            });
            defaultEventHubsNamespaceProducerFactory.addListener((str, eventHubProducerAsyncClient) -> {
                DefaultInstrumentation defaultInstrumentation = new DefaultInstrumentation(str, Instrumentation.Type.PRODUCER);
                defaultInstrumentation.setStatus(Instrumentation.Status.UP);
                this.instrumentationManager.addHealthInstrumentation(defaultInstrumentation);
            });
            this.eventHubsTemplate = new EventHubsTemplate(defaultEventHubsNamespaceProducerFactory);
        }
        return this.eventHubsTemplate;
    }

    private EventHubsProcessorFactory getProcessorFactory() {
        if (this.processorFactory == null) {
            this.processorFactory = new DefaultEventHubsNamespaceProcessorFactory(this.checkpointStore, this.namespaceProperties);
            this.processorFactoryCustomizers.forEach(eventHubsProcessorFactoryCustomizer -> {
                eventHubsProcessorFactoryCustomizer.customize(this.processorFactory);
            });
            this.processorFactory.addListener((str, str2, eventProcessorClient) -> {
                EventHubsProcessorInstrumentation eventHubsProcessorInstrumentation = new EventHubsProcessorInstrumentation(str + "/" + str2, Instrumentation.Type.CONSUMER, Duration.ofMinutes(2L));
                eventHubsProcessorInstrumentation.setStatus(Instrumentation.Status.UP);
                this.instrumentationManager.addHealthInstrumentation(eventHubsProcessorInstrumentation);
            });
        }
        return this.processorFactory;
    }

    public void setNamespaceProperties(NamespaceProperties namespaceProperties) {
        this.namespaceProperties = namespaceProperties;
    }

    public void setCheckpointStore(CheckpointStore checkpointStore) {
        this.checkpointStore = checkpointStore;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InstrumentationManager getInstrumentationManager() {
        return this.instrumentationManager;
    }

    public void addProducerFactoryCustomizer(EventHubsProducerFactoryCustomizer eventHubsProducerFactoryCustomizer) {
        if (eventHubsProducerFactoryCustomizer != null) {
            this.producerFactoryCustomizers.add(eventHubsProducerFactoryCustomizer);
        }
    }

    public void addProcessorFactoryCustomizer(EventHubsProcessorFactoryCustomizer eventHubsProcessorFactoryCustomizer) {
        if (eventHubsProcessorFactoryCustomizer != null) {
            this.processorFactoryCustomizers.add(eventHubsProcessorFactoryCustomizer);
        }
    }
}
