package com.azure.messaging.eventhubs;

import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.logging.LogLevel;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.PartitionProcessor;
import com.azure.messaging.eventhubs.implementation.PartitionProcessorException;
import com.azure.messaging.eventhubs.implementation.ReactorShim;
import com.azure.messaging.eventhubs.implementation.instrumentation.EventHubsTracer;
import com.azure.messaging.eventhubs.models.Checkpoint;
import com.azure.messaging.eventhubs.models.CloseContext;
import com.azure.messaging.eventhubs.models.CloseReason;
import com.azure.messaging.eventhubs.models.ErrorContext;
import com.azure.messaging.eventhubs.models.EventBatchContext;
import com.azure.messaging.eventhubs.models.EventContext;
import com.azure.messaging.eventhubs.models.EventPosition;
import com.azure.messaging.eventhubs.models.InitializationContext;
import com.azure.messaging.eventhubs.models.LastEnqueuedEventProperties;
import com.azure.messaging.eventhubs.models.PartitionContext;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import com.azure.messaging.eventhubs.models.PartitionOwnership;
import com.azure.messaging.eventhubs.models.ReceiveOptions;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/azure/messaging/eventhubs/PartitionPumpManager.class */
public class PartitionPumpManager {
    private static final int MAXIMUM_QUEUE_SIZE = 10000;
    private static final ClientLogger LOGGER = new ClientLogger(PartitionPumpManager.class);
    private final CheckpointStore checkpointStore;
    private final Supplier<PartitionProcessor> partitionProcessorFactory;
    private final EventHubClientBuilder eventHubClientBuilder;
    private final int prefetch;
    private final EventHubsTracer tracer;
    private final EventProcessorClientOptions options;
    private final int schedulerSize = Runtime.getRuntime().availableProcessors() * 4;
    private final Map<String, PartitionPump> partitionPumps = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    public PartitionPumpManager(CheckpointStore checkpointStore, Supplier<PartitionProcessor> supplier, EventHubClientBuilder eventHubClientBuilder, EventHubsTracer eventHubsTracer, EventProcessorClientOptions eventProcessorClientOptions) {
        this.checkpointStore = checkpointStore;
        this.partitionProcessorFactory = supplier;
        this.eventHubClientBuilder = eventHubClientBuilder;
        this.options = eventProcessorClientOptions;
        this.prefetch = eventHubClientBuilder.getPrefetchCount() == null ? 500 : eventHubClientBuilder.getPrefetchCount().intValue();
        this.tracer = eventHubsTracer;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void stopAllPartitionPumps() {
        this.partitionPumps.forEach((str, partitionPump) -> {
            try {
                try {
                    partitionPump.close();
                    this.partitionPumps.remove(str);
                } catch (Exception e) {
                    LOGGER.atWarning().addKeyValue(ClientConstants.PARTITION_ID_KEY, str).log(Messages.FAILED_CLOSE_CONSUMER_PARTITION, new Object[]{e});
                    this.partitionPumps.remove(str);
                }
            } catch (Throwable th) {
                this.partitionPumps.remove(str);
                throw th;
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void verifyPartitionConnection(PartitionOwnership partitionOwnership) {
        String partitionId = partitionOwnership.getPartitionId();
        PartitionPump partitionPump = this.partitionPumps.get(partitionId);
        if (partitionPump == null) {
            LOGGER.atInfo().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionId).addKeyValue("entity-path", partitionOwnership.getEventHubName()).log("No partition pump found for ownership record.");
            return;
        }
        if (partitionPump.getClient().isConnectionClosed()) {
            LOGGER.atInfo().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionId).addKeyValue("entity-path", partitionOwnership.getEventHubName()).log("Connection closed for partition. Removing the consumer.");
            try {
                try {
                    partitionPump.close();
                    this.partitionPumps.remove(partitionId);
                } catch (Exception e) {
                    LOGGER.atWarning().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionId).log(Messages.FAILED_CLOSE_CONSUMER_PARTITION, new Object[]{e});
                    this.partitionPumps.remove(partitionId);
                }
            } catch (Throwable th) {
                this.partitionPumps.remove(partitionId);
                throw th;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startPartitionPump(PartitionOwnership partitionOwnership, Checkpoint checkpoint) {
        String partitionId = partitionOwnership.getPartitionId();
        if (this.partitionPumps.containsKey(partitionId)) {
            LOGGER.atVerbose().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionId).log("Consumer is already running.");
            return;
        }
        try {
            PartitionContext partitionContext = new PartitionContext(partitionOwnership.getFullyQualifiedNamespace(), partitionOwnership.getEventHubName(), partitionOwnership.getConsumerGroup(), partitionId);
            PartitionProcessor partitionProcessor = this.partitionProcessorFactory.get();
            partitionProcessor.initialize(new InitializationContext(partitionContext));
            EventPosition initialEventPosition = getInitialEventPosition(partitionId, checkpoint);
            LOGGER.atInfo().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionId).addKeyValue("eventPosition", initialEventPosition).log("Starting event processing.");
            ReceiveOptions trackLastEnqueuedEventProperties = new ReceiveOptions().setOwnerLevel(0L).setTrackLastEnqueuedEventProperties(this.options.isTrackLastEnqueuedEventProperties());
            Scheduler newBoundedElastic = Schedulers.newBoundedElastic(this.schedulerSize, MAXIMUM_QUEUE_SIZE, "partition-pump-" + partitionId);
            EventHubConsumerAsyncClient createConsumer = this.eventHubClientBuilder.buildAsyncClient().createConsumer(partitionOwnership.getConsumerGroup(), this.prefetch, true);
            PartitionPump partitionPump = new PartitionPump(partitionId, createConsumer, newBoundedElastic);
            this.partitionPumps.put(partitionId, partitionPump);
            Flux doOnNext = createConsumer.receiveFromPartition(partitionId, initialEventPosition, trackLastEnqueuedEventProperties).doOnNext(partitionEvent -> {
                if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
                    LOGGER.atVerbose().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionContext.getPartitionId()).addKeyValue("entity-path", partitionContext.getEventHubName()).addKeyValue(ClientConstants.SEQUENCE_NUMBER_KEY, partitionEvent.getData().getSequenceNumber()).log("On next.");
                }
            });
            (this.options.getMaxWaitTime() != null ? ReactorShim.windowTimeout(doOnNext, this.options.getMaxBatchSize(), this.options.getMaxWaitTime()) : doOnNext.window(this.options.getMaxBatchSize())).concatMap((v0) -> {
                return v0.collectList();
            }, 0).publishOn(newBoundedElastic, false, Math.max(this.prefetch / this.options.getMaxBatchSize(), 1)).subscribe(list -> {
                processEvents(partitionContext, partitionProcessor, partitionPump, list);
            }, th -> {
                handleError(partitionOwnership, partitionPump, partitionProcessor, th, partitionContext);
            }, () -> {
                try {
                    try {
                        partitionProcessor.close(new CloseContext(partitionContext, CloseReason.EVENT_PROCESSOR_SHUTDOWN));
                        cleanup(partitionOwnership, partitionPump);
                    } catch (Throwable th2) {
                        LOGGER.atError().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionContext.getPartitionId()).log("Error occurred calling partitionProcessor.close when closing partition pump.", new Object[]{th2});
                        cleanup(partitionOwnership, partitionPump);
                    }
                } catch (Throwable th3) {
                    cleanup(partitionOwnership, partitionPump);
                    throw th3;
                }
            });
        } catch (Exception e) {
            if (this.partitionPumps.containsKey(partitionId)) {
                cleanup(partitionOwnership, this.partitionPumps.get(partitionId));
            }
            throw LOGGER.atError().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionId).log(new PartitionProcessorException("Error occurred while starting partition pump for partition " + partitionId, e));
        }
    }

    private void processEvent(PartitionContext partitionContext, PartitionProcessor partitionProcessor, EventContext eventContext) {
        EventData eventData = eventContext.getEventData();
        try {
            if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
                LOGGER.atVerbose().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionContext.getPartitionId()).addKeyValue("entity-path", partitionContext.getEventHubName()).log("Processing event.");
            }
            partitionProcessor.processEvent(new EventContext(partitionContext, eventData, this.checkpointStore, eventContext.getLastEnqueuedEventProperties()));
            if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
                LOGGER.atVerbose().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionContext.getPartitionId()).addKeyValue("entity-path", partitionContext.getEventHubName()).log("Completed processing event.");
            }
        } catch (Throwable th) {
            throw LOGGER.logExceptionAsError(new PartitionProcessorException("Error in event processing callback", th));
        }
    }

    private void processEvents(PartitionContext partitionContext, PartitionProcessor partitionProcessor, PartitionPump partitionPump, List<PartitionEvent> list) {
        Context startProcessSpan;
        AutoCloseable makeSpanCurrent;
        try {
            try {
                if (this.options.isBatchReceiveMode()) {
                    LastEnqueuedEventProperties[] lastEnqueuedEventPropertiesArr = new LastEnqueuedEventProperties[1];
                    List<EventData> list2 = (List) list.stream().map(partitionEvent -> {
                        lastEnqueuedEventPropertiesArr[0] = partitionEvent.getLastEnqueuedEventProperties();
                        return partitionEvent.getData();
                    }).collect(Collectors.toList());
                    EventBatchContext eventBatchContext = new EventBatchContext(partitionContext, list2, this.checkpointStore, updateOrGetLastEnqueuedEventProperties(partitionPump, lastEnqueuedEventPropertiesArr[0]));
                    startProcessSpan = this.tracer.startProcessSpan("EventHubs.process", list2, Context.NONE);
                    makeSpanCurrent = this.tracer.makeSpanCurrent(startProcessSpan);
                    if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
                        LOGGER.atVerbose().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionContext.getPartitionId()).addKeyValue("entity-path", partitionContext.getEventHubName()).log("Processing event batch.");
                    }
                    partitionProcessor.processEventBatch(eventBatchContext);
                    if (LOGGER.canLogAtLevel(LogLevel.VERBOSE)) {
                        LOGGER.atVerbose().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionContext.getPartitionId()).addKeyValue("entity-path", partitionContext.getEventHubName()).log("Completed processing event batch.");
                    }
                } else {
                    EventData data = list.size() == 1 ? list.get(0).getData() : null;
                    EventContext eventContext = new EventContext(partitionContext, data, this.checkpointStore, updateOrGetLastEnqueuedEventProperties(partitionPump, list.size() == 1 ? list.get(0).getLastEnqueuedEventProperties() : null));
                    startProcessSpan = this.tracer.startProcessSpan("EventHubs.process", data, Context.NONE);
                    makeSpanCurrent = this.tracer.makeSpanCurrent(startProcessSpan);
                    processEvent(partitionContext, partitionProcessor, eventContext);
                }
                this.tracer.endSpan(null, startProcessSpan, makeSpanCurrent);
            } catch (Throwable th) {
                throw LOGGER.logExceptionAsError(new PartitionProcessorException("Error in event processing callback", th));
            }
        } catch (Throwable th2) {
            this.tracer.endSpan(null, null, null);
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<String, PartitionPump> getPartitionPumps() {
        return this.partitionPumps;
    }

    EventPosition getInitialEventPosition(String str, Checkpoint checkpoint) {
        EventPosition apply;
        return (checkpoint == null || checkpoint.getOffset() == null) ? (checkpoint == null || checkpoint.getSequenceNumber() == null) ? (this.options.getInitialEventPositionProvider() == null || (apply = this.options.getInitialEventPositionProvider().apply(str)) == null) ? EventPosition.latest() : apply : EventPosition.fromSequenceNumber(checkpoint.getSequenceNumber().longValue()) : EventPosition.fromOffset(checkpoint.getOffset().longValue());
    }

    private void handleError(PartitionOwnership partitionOwnership, PartitionPump partitionPump, PartitionProcessor partitionProcessor, Throwable th, PartitionContext partitionContext) {
        boolean z = true;
        if (!(th instanceof PartitionProcessorException)) {
            z = false;
            LOGGER.atWarning().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionContext.getPartitionId()).log("Error receiving events from partition.", new Object[]{th});
            try {
                partitionProcessor.processError(new ErrorContext(partitionContext, th));
            } catch (Throwable th2) {
                LOGGER.atError().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionContext.getPartitionId()).log("Error occurred calling partitionProcessor.processError.", new Object[]{th2});
            }
        }
        try {
            partitionProcessor.close(new CloseContext(partitionContext, CloseReason.LOST_PARTITION_OWNERSHIP));
        } catch (Throwable th3) {
            LOGGER.atError().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionContext.getPartitionId()).log("Error occurred calling partitionProcessor.close.", new Object[]{th3});
        }
        cleanup(partitionOwnership, partitionPump);
        if (z) {
            throw LOGGER.logExceptionAsError((PartitionProcessorException) th);
        }
    }

    private void cleanup(PartitionOwnership partitionOwnership, PartitionPump partitionPump) {
        try {
            LOGGER.atInfo().addKeyValue(ClientConstants.PARTITION_ID_KEY, partitionOwnership.getPartitionId()).log("Closing consumer.");
            partitionPump.close();
        } finally {
            this.partitionPumps.remove(partitionOwnership.getPartitionId());
        }
    }

    private LastEnqueuedEventProperties updateOrGetLastEnqueuedEventProperties(PartitionPump partitionPump, LastEnqueuedEventProperties lastEnqueuedEventProperties) {
        if (lastEnqueuedEventProperties != null) {
            partitionPump.setLastEnqueuedEventProperties(lastEnqueuedEventProperties);
        }
        return partitionPump.getLastEnqueuedEventProperties();
    }
}
