package com.microsoft.azure.eventprocessorhost;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.ReceiverOptions;
import com.microsoft.azure.servicebus.ReceiverDisconnectedException;
import com.microsoft.azure.servicebus.ServiceBusException;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;

/* loaded from: input_file:com/microsoft/azure/eventprocessorhost/EventHubPartitionPump.class */
class EventHubPartitionPump extends PartitionPump {
    private CompletableFuture<?> internalOperationFuture;
    private EventHubClient eventHubClient;
    private PartitionReceiver partitionReceiver;
    private InternalReceiveHandler internalReceiveHandler;

    /* loaded from: input_file:com/microsoft/azure/eventprocessorhost/EventHubPartitionPump$InternalReceiveHandler.class */
    private class InternalReceiveHandler extends PartitionReceiveHandler {
        InternalReceiveHandler() {
            super(EventHubPartitionPump.this.host.getEventProcessorOptions().getMaxBatchSize());
        }

        public void onReceive(Iterable<EventData> iterable) {
            if (EventHubPartitionPump.this.host.getEventProcessorOptions().getReceiverRuntimeMetricEnabled()) {
                EventHubPartitionPump.this.partitionContext.setRuntimeInformation(EventHubPartitionPump.this.partitionReceiver.getRuntimeInformation());
            }
            Iterable<EventData> iterable2 = iterable;
            if (iterable2 == null) {
                iterable2 = new ArrayList();
            }
            EventHubPartitionPump.this.onEvents(iterable2);
        }

        public void onError(Throwable th) {
            EventHubPartitionPump.this.pumpStatus = PartitionPumpStatus.PP_ERRORED;
            if (th == null) {
                th = new Throwable("No error info supplied by EventHub client");
            }
            if (th instanceof ReceiverDisconnectedException) {
                EventHubPartitionPump.this.host.logWithHostAndPartition(Level.WARNING, EventHubPartitionPump.this.partitionContext, "EventHub client disconnected, probably another host took the partition");
            } else {
                EventHubPartitionPump.this.host.logWithHostAndPartition(Level.SEVERE, EventHubPartitionPump.this.partitionContext, "EventHub client error: " + th.toString());
                if (th instanceof Exception) {
                    EventHubPartitionPump.this.host.logWithHostAndPartition(Level.SEVERE, EventHubPartitionPump.this.partitionContext, "EventHub client error continued", (Exception) th);
                }
            }
            EventHubPartitionPump.this.onError(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventHubPartitionPump(EventProcessorHost eventProcessorHost, Pump pump, Lease lease) {
        super(eventProcessorHost, pump, lease);
        this.internalOperationFuture = null;
        this.eventHubClient = null;
        this.partitionReceiver = null;
        this.internalReceiveHandler = null;
    }

    @Override // com.microsoft.azure.eventprocessorhost.PartitionPump
    void specializedStartPump() {
        boolean z = false;
        int i = 0;
        Exception exc = null;
        while (true) {
            try {
                openClients();
                z = true;
            } catch (Exception e) {
                exc = e;
                if ((e instanceof ExecutionException) && (e.getCause() instanceof ReceiverDisconnectedException)) {
                    this.host.logWithHostAndPartition(Level.WARNING, this.partitionContext, "Receiver disconnected on create, bad epoch?", e);
                    break;
                } else {
                    this.host.logWithHostAndPartition(Level.WARNING, this.partitionContext, "Failure creating client or receiver, retrying", e);
                    i++;
                }
            }
            if (z || i >= 5) {
                break;
            }
        }
        if (!z) {
            this.processor.onError(this.partitionContext, exc);
            this.pumpStatus = PartitionPumpStatus.PP_OPENFAILED;
        }
        if (this.pumpStatus == PartitionPumpStatus.PP_OPENING) {
            this.internalReceiveHandler = new InternalReceiveHandler();
            this.pumpStatus = PartitionPumpStatus.PP_RUNNING;
            this.partitionReceiver.setReceiveHandler(this.internalReceiveHandler, this.host.getEventProcessorOptions().getInvokeProcessorAfterReceiveTimeout().booleanValue());
        }
        if (this.pumpStatus == PartitionPumpStatus.PP_OPENFAILED) {
            this.pumpStatus = PartitionPumpStatus.PP_CLOSING;
            cleanUpClients();
            this.pumpStatus = PartitionPumpStatus.PP_CLOSED;
        }
    }

    private void openClients() throws ServiceBusException, IOException, InterruptedException, ExecutionException {
        this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Opening EH client");
        this.internalOperationFuture = EventHubClient.createFromConnectionString(this.host.getEventHubConnectionString());
        this.eventHubClient = (EventHubClient) this.internalOperationFuture.get();
        this.internalOperationFuture = null;
        ReceiverOptions receiverOptions = new ReceiverOptions();
        receiverOptions.setReceiverRuntimeMetricEnabled(this.host.getEventProcessorOptions().getReceiverRuntimeMetricEnabled());
        Object initialOffset = this.partitionContext.getInitialOffset();
        long epoch = this.lease.getEpoch();
        this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Opening EH receiver with epoch " + epoch + " at location " + initialOffset);
        if (initialOffset instanceof String) {
            this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(), (String) initialOffset, epoch, receiverOptions);
        } else {
            if (!(initialOffset instanceof Instant)) {
                String str = "Starting offset is not String or Instant, is " + (initialOffset != null ? initialOffset.getClass().toString() : "null");
                this.host.logWithHostAndPartition(Level.SEVERE, this.partitionContext, str);
                throw new RuntimeException(str);
            }
            this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(), (Instant) initialOffset, epoch, receiverOptions);
        }
        this.lease.setEpoch(epoch);
        if (this.internalOperationFuture == null) {
            this.host.logWithHostAndPartition(Level.SEVERE, this.partitionContext, "createEpochReceiver failed with null");
            throw new RuntimeException("createEpochReceiver failed with null");
        }
        this.partitionReceiver = (PartitionReceiver) this.internalOperationFuture.get();
        this.partitionReceiver.setPrefetchCount(this.host.getEventProcessorOptions().getPrefetchCount());
        this.partitionReceiver.setReceiveTimeout(this.host.getEventProcessorOptions().getReceiveTimeOut());
        this.internalOperationFuture = null;
        this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "EH client and receiver creation finished");
    }

    private void cleanUpClients() {
        if (this.partitionReceiver != null) {
            synchronized (this.processingSynchronizer) {
                this.partitionReceiver.setReceiveHandler((PartitionReceiveHandler) null);
            }
            this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Closing EH receiver");
            this.partitionReceiver.close();
            this.partitionReceiver = null;
        }
        if (this.eventHubClient != null) {
            this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Closing EH client");
            this.eventHubClient.close();
            this.eventHubClient = null;
        }
    }

    @Override // com.microsoft.azure.eventprocessorhost.PartitionPump
    void specializedShutdown(CloseReason closeReason) {
        CompletableFuture<?> completableFuture = this.internalOperationFuture;
        if (completableFuture != null) {
            completableFuture.cancel(true);
        }
        if (this.partitionReceiver != null) {
            this.partitionReceiver.setReceiveHandler((PartitionReceiveHandler) null);
            cleanUpClients();
        }
    }
}
