package com.azure.messaging.eventhubs.implementation;

import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.Messages;
import com.azure.messaging.eventhubs.models.PartitionEvent;
import java.util.Collections;
import java.util.Objects;
import java.util.Timer;
import java.util.TimerTask;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
import reactor.util.context.Context;

/* loaded from: input_file:com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber.class */
public class SynchronousEventSubscriber extends BaseSubscriber<PartitionEvent> {
    private final Timer timer = new Timer();
    private final ClientLogger logger;
    private final SynchronousReceiveWork work;
    private volatile Subscription subscription;
    private final Context context;
    private final String subscriberId;

    /* loaded from: input_file:com/azure/messaging/eventhubs/implementation/SynchronousEventSubscriber$ReceiveTimeoutTask.class */
    private static class ReceiveTimeoutTask extends TimerTask {
        private final ClientLogger logger;
        private final Runnable onDispose;

        ReceiveTimeoutTask(Runnable runnable, ClientLogger clientLogger) {
            this.onDispose = runnable;
            this.logger = clientLogger;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            this.logger.info("Timeout encountered, disposing of subscriber.");
            this.onDispose.run();
        }
    }

    public SynchronousEventSubscriber(SynchronousReceiveWork synchronousReceiveWork) {
        this.work = (SynchronousReceiveWork) Objects.requireNonNull(synchronousReceiveWork, "'work' cannot be null.");
        this.subscriberId = String.valueOf(synchronousReceiveWork.getId());
        this.context = super.currentContext().put(ClientConstants.SUBSCRIBER_ID_KEY, this.subscriberId);
        this.logger = new ClientLogger(SynchronousEventSubscriber.class, Collections.singletonMap(ClientConstants.SUBSCRIBER_ID_KEY, this.subscriberId));
    }

    public Context currentContext() {
        return this.context;
    }

    protected void hookOnSubscribe(Subscription subscription) {
        if (this.subscription == null) {
            this.subscription = subscription;
        }
        this.logger.atInfo().addKeyValue("pendingEvents", this.work.getNumberOfEvents()).log("Scheduling receive timeout task.");
        subscription.request(this.work.getNumberOfEvents());
        this.timer.schedule(new ReceiveTimeoutTask(this::dispose, this.logger), this.work.getTimeout().toMillis());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void hookOnNext(PartitionEvent partitionEvent) {
        this.work.next(partitionEvent);
        if (this.work.isTerminal()) {
            this.logger.info("Work completed. Closing Flux and cancelling subscription.");
            dispose();
        }
    }

    protected void hookOnComplete() {
        this.logger.info("Completed. No events to listen to.");
        dispose();
    }

    protected void hookOnError(Throwable th) {
        this.logger.error(Messages.ERROR_OCCURRED_IN_SUBSCRIBER_ERROR, new Object[]{th});
        this.work.error(th);
        dispose();
    }

    public void dispose() {
        this.work.complete();
        this.subscription.cancel();
        this.timer.cancel();
        super.dispose();
    }
}
