package com.azure.spring.integration.eventhubs.inbound;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.spring.cloud.service.eventhubs.consumer.EventHubsErrorHandler;
import com.azure.spring.cloud.service.listener.MessageListener;
import com.azure.spring.integration.core.instrumentation.Instrumentation;
import com.azure.spring.integration.core.instrumentation.InstrumentationManager;
import com.azure.spring.integration.eventhubs.implementation.health.EventHubsProcessorInstrumentation;
import com.azure.spring.messaging.ListenerMode;
import com.azure.spring.messaging.checkpoint.AzureCheckpointer;
import com.azure.spring.messaging.converter.AzureMessageConverter;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointConfig;
import com.azure.spring.messaging.eventhubs.core.checkpoint.CheckpointMode;
import com.azure.spring.messaging.eventhubs.core.listener.EventHubsMessageListenerContainer;
import com.azure.spring.messaging.eventhubs.implementation.checkpoint.CheckpointManagers;
import com.azure.spring.messaging.eventhubs.implementation.checkpoint.EventCheckpointManager;
import com.azure.spring.messaging.eventhubs.implementation.core.listener.adapter.BatchMessagingMessageListenerAdapter;
import com.azure.spring.messaging.eventhubs.implementation.core.listener.adapter.RecordMessagingMessageListenerAdapter;
import java.util.HashMap;
import java.util.Objects;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.MessageHeaders;

/* loaded from: input_file:com/azure/spring/integration/eventhubs/inbound/EventHubsInboundChannelAdapter.class */
public class EventHubsInboundChannelAdapter extends MessageProducerSupport {
    private static final Logger LOGGER = LoggerFactory.getLogger(EventHubsInboundChannelAdapter.class);
    private final EventHubsMessageListenerContainer listenerContainer;
    private final ListenerMode listenerMode;
    private final IntegrationRecordMessageListener recordListener;
    private final IntegrationBatchMessageListener batchListener;
    private final CheckpointConfig checkpointConfig;
    private EventCheckpointManager checkpointManager;
    private Class<?> payloadType;
    private InstrumentationManager instrumentationManager;
    private String instrumentationId;

    /* loaded from: input_file:com/azure/spring/integration/eventhubs/inbound/EventHubsInboundChannelAdapter$IntegrationBatchMessageListener.class */
    private class IntegrationBatchMessageListener extends BatchMessagingMessageListenerAdapter {
        private IntegrationBatchMessageListener() {
        }

        public void onMessage(EventBatchContext eventBatchContext) {
            PartitionContext partitionContext = eventBatchContext.getPartitionContext();
            HashMap hashMap = new HashMap();
            hashMap.put("azure_raw_partition_id", partitionContext.getPartitionId());
            hashMap.put("azure_eventhubs_last_enqueued_event_properties", eventBatchContext.getLastEnqueuedEventProperties());
            Objects.requireNonNull(eventBatchContext);
            AzureCheckpointer azureCheckpointer = new AzureCheckpointer(eventBatchContext::updateCheckpointAsync);
            if (CheckpointMode.MANUAL.equals(EventHubsInboundChannelAdapter.this.checkpointConfig.getMode())) {
                hashMap.put("azure_checkpointer", azureCheckpointer);
            }
            EventHubsInboundChannelAdapter.this.sendMessage(getMessageConverter().toMessage(eventBatchContext, new MessageHeaders(hashMap), this.payloadType));
            if (EventHubsInboundChannelAdapter.this.checkpointConfig.getMode().equals(CheckpointMode.BATCH)) {
                EventHubsInboundChannelAdapter.this.checkpointManager.checkpoint(eventBatchContext);
            }
        }
    }

    /* loaded from: input_file:com/azure/spring/integration/eventhubs/inbound/EventHubsInboundChannelAdapter$IntegrationErrorHandler.class */
    private class IntegrationErrorHandler implements EventHubsErrorHandler {
        private IntegrationErrorHandler() {
        }

        public void accept(ErrorContext errorContext) {
            EventHubsInboundChannelAdapter.LOGGER.error("Error occurred on partition: {}. Error: {}", errorContext.getPartitionContext().getPartitionId(), errorContext.getThrowable());
            updateInstrumentation(errorContext);
        }

        private void updateInstrumentation(ErrorContext errorContext) {
            EventHubsProcessorInstrumentation healthInstrumentation;
            if (EventHubsInboundChannelAdapter.this.instrumentationManager == null || (healthInstrumentation = EventHubsInboundChannelAdapter.this.instrumentationManager.getHealthInstrumentation(EventHubsInboundChannelAdapter.this.instrumentationId)) == null) {
                return;
            }
            if (healthInstrumentation instanceof EventHubsProcessorInstrumentation) {
                healthInstrumentation.markError(errorContext);
            } else {
                healthInstrumentation.setStatus(Instrumentation.Status.DOWN, errorContext.getThrowable());
            }
        }
    }

    /* loaded from: input_file:com/azure/spring/integration/eventhubs/inbound/EventHubsInboundChannelAdapter$IntegrationRecordMessageListener.class */
    private class IntegrationRecordMessageListener extends RecordMessagingMessageListenerAdapter {
        private IntegrationRecordMessageListener() {
        }

        public void onMessage(EventContext eventContext) {
            PartitionContext partitionContext = eventContext.getPartitionContext();
            HashMap hashMap = new HashMap();
            hashMap.put("azure_raw_partition_id", partitionContext.getPartitionId());
            hashMap.put("azure_eventhubs_last_enqueued_event_properties", eventContext.getLastEnqueuedEventProperties());
            EventData eventData = eventContext.getEventData();
            Objects.requireNonNull(eventContext);
            AzureCheckpointer azureCheckpointer = new AzureCheckpointer(eventContext::updateCheckpointAsync);
            if (CheckpointMode.MANUAL.equals(EventHubsInboundChannelAdapter.this.checkpointConfig.getMode())) {
                hashMap.put("azure_checkpointer", azureCheckpointer);
            }
            EventHubsInboundChannelAdapter.this.sendMessage(getMessageConverter().toMessage(eventData, new MessageHeaders(hashMap), this.payloadType));
            EventHubsInboundChannelAdapter.this.checkpointManager.checkpoint(eventContext);
        }
    }

    public EventHubsInboundChannelAdapter(EventHubsMessageListenerContainer eventHubsMessageListenerContainer) {
        this(eventHubsMessageListenerContainer, ListenerMode.RECORD);
    }

    public EventHubsInboundChannelAdapter(EventHubsMessageListenerContainer eventHubsMessageListenerContainer, ListenerMode listenerMode) {
        this.recordListener = new IntegrationRecordMessageListener();
        this.batchListener = new IntegrationBatchMessageListener();
        this.listenerContainer = eventHubsMessageListenerContainer;
        this.listenerMode = listenerMode;
        CheckpointConfig checkpointConfig = eventHubsMessageListenerContainer.getContainerProperties().getCheckpointConfig();
        this.checkpointConfig = checkpointConfig == null ? new CheckpointConfig() : checkpointConfig;
    }

    protected void onInit() {
        MessageListener messageListener = ListenerMode.BATCH.equals(this.listenerMode) ? this.batchListener : this.recordListener;
        this.checkpointManager = CheckpointManagers.of(this.checkpointConfig, this.listenerMode);
        this.listenerContainer.setupMessageListener(messageListener);
        this.listenerContainer.setErrorHandler(new IntegrationErrorHandler());
        enhanceListenerContainer();
    }

    public void doStart() {
        this.listenerContainer.start();
    }

    protected void doStop() {
        this.listenerContainer.stop();
    }

    public void setMessageConverter(AzureMessageConverter<EventData, EventData> azureMessageConverter) {
        this.recordListener.setMessageConverter(azureMessageConverter);
    }

    public void setBatchMessageConverter(AzureMessageConverter<EventBatchContext, EventData> azureMessageConverter) {
        this.batchListener.setMessageConverter(azureMessageConverter);
    }

    public void setPayloadType(Class<?> cls) {
        if (ListenerMode.BATCH.equals(this.listenerMode)) {
            this.batchListener.setPayloadType(cls);
        } else {
            this.recordListener.setPayloadType(cls);
        }
    }

    public void setInstrumentationManager(InstrumentationManager instrumentationManager) {
        this.instrumentationManager = instrumentationManager;
    }

    public void setInstrumentationId(String str) {
        this.instrumentationId = str;
    }

    private void enhanceListenerContainer() {
        this.listenerContainer.getContainerProperties().setCloseContextConsumer(closeContext -> {
            LOGGER.info("Stopped receiving on partition: {}. Reason: {}", closeContext.getPartitionContext().getPartitionId(), closeContext.getCloseReason());
        });
        this.listenerContainer.getContainerProperties().setInitializationContextConsumer(initializationContext -> {
            LOGGER.info("Started receiving on partition: {}", initializationContext.getPartitionContext().getPartitionId());
        });
    }
}
