package com.azure.messaging.eventhubs;

import com.azure.core.amqp.AmqpRetryOptions;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.models.SendBatchFailedContext;
import com.azure.messaging.eventhubs.models.SendBatchSucceededContext;
import com.azure.messaging.eventhubs.models.SendOptions;
import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.concurrent.Queues;

@ServiceClient(builder = EventHubBufferedProducerClientBuilder.class, isAsync = true)
/* loaded from: input_file:com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient.class */
public final class EventHubBufferedProducerAsyncClient implements Closeable {
    private static final SendOptions ROUND_ROBIN_SEND_OPTIONS = new SendOptions();
    private final EventHubProducerAsyncClient client;
    private final BufferedProducerClientOptions clientOptions;
    private final PartitionResolver partitionResolver;
    private final Mono<Void> initialisationMono;
    private final Mono<String[]> partitionIdsMono;
    private final AmqpRetryOptions retryOptions;
    private final Tracer tracer;
    private final ClientLogger logger = new ClientLogger(EventHubBufferedProducerAsyncClient.class);
    private final AtomicBoolean isClosed = new AtomicBoolean(false);
    private final ConcurrentHashMap<String, EventHubBufferedPartitionProducer> partitionProducers = new ConcurrentHashMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/azure/messaging/eventhubs/EventHubBufferedProducerAsyncClient$BufferedProducerClientOptions.class */
    public static class BufferedProducerClientOptions {
        private Consumer<SendBatchFailedContext> sendFailedContext;
        private Consumer<SendBatchSucceededContext> sendSucceededContext;
        private boolean enableIdempotentRetries = false;
        private int maxConcurrentSendsPerPartition = 1;
        private int maxEventBufferLengthPerPartition = 1500;
        private Duration maxWaitTime = Duration.ofSeconds(30);
        private int maxConcurrentSends = 1;

        boolean enableIdempotentRetries() {
            return this.enableIdempotentRetries;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setEnableIdempotentRetries(boolean z) {
            this.enableIdempotentRetries = z;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public int getMaxConcurrentSends() {
            return this.maxConcurrentSends;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setMaxConcurrentSends(int i) {
            this.maxConcurrentSends = i;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public int getMaxConcurrentSendsPerPartition() {
            return this.maxConcurrentSendsPerPartition;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setMaxConcurrentSendsPerPartition(int i) {
            this.maxConcurrentSendsPerPartition = i;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public int getMaxEventBufferLengthPerPartition() {
            return this.maxEventBufferLengthPerPartition;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void maxEventBufferLengthPerPartition(int i) {
            this.maxEventBufferLengthPerPartition = i;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Duration getMaxWaitTime() {
            return this.maxWaitTime;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setMaxWaitTime(Duration duration) {
            this.maxWaitTime = duration;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Consumer<SendBatchFailedContext> getSendFailedContext() {
            return this.sendFailedContext;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setSendFailedContext(Consumer<SendBatchFailedContext> consumer) {
            this.sendFailedContext = consumer;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public Consumer<SendBatchSucceededContext> getSendSucceededContext() {
            return this.sendSucceededContext;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setSendSucceededContext(Consumer<SendBatchSucceededContext> consumer) {
            this.sendSucceededContext = consumer;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventHubBufferedProducerAsyncClient(EventHubClientBuilder eventHubClientBuilder, BufferedProducerClientOptions bufferedProducerClientOptions, PartitionResolver partitionResolver, AmqpRetryOptions amqpRetryOptions, Tracer tracer) {
        this.client = eventHubClientBuilder.buildAsyncProducerClient();
        this.clientOptions = bufferedProducerClientOptions;
        this.partitionResolver = partitionResolver;
        this.retryOptions = amqpRetryOptions;
        this.initialisationMono = this.client.getEventHubProperties().flatMapMany(eventHubProperties -> {
            return Flux.fromArray((String[]) eventHubProperties.getPartitionIds().stream().toArray(i -> {
                return new String[i];
            }));
        }).map(str -> {
            return this.partitionProducers.computeIfAbsent(str, str -> {
                return createPartitionProducer(str);
            });
        }).then().cache();
        this.partitionIdsMono = this.initialisationMono.then(Mono.fromCallable(() -> {
            return (String[]) new ArrayList(this.partitionProducers.keySet()).toArray(new String[0]);
        })).cache();
        this.tracer = tracer;
    }

    public String getFullyQualifiedNamespace() {
        return this.client.getFullyQualifiedNamespace();
    }

    public String getEventHubName() {
        return this.client.getEventHubName();
    }

    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<EventHubProperties> getEventHubProperties() {
        return this.initialisationMono.then(Mono.defer(() -> {
            return this.client.getEventHubProperties();
        }));
    }

    @ServiceMethod(returns = ReturnType.COLLECTION)
    public Flux<String> getPartitionIds() {
        return this.partitionIdsMono.flatMapMany(strArr -> {
            return Flux.fromArray(strArr);
        });
    }

    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<PartitionProperties> getPartitionProperties(String str) {
        return Objects.isNull(str) ? FluxUtil.monoError(this.logger, new NullPointerException("'partitionId' cannot be null.")) : CoreUtils.isNullOrEmpty(str) ? FluxUtil.monoError(this.logger, new IllegalArgumentException("'partitionId' cannot be empty.")) : this.client.getPartitionProperties(str);
    }

    public int getBufferedEventCount() {
        return this.partitionProducers.values().parallelStream().mapToInt(eventHubBufferedPartitionProducer -> {
            return eventHubBufferedPartitionProducer.getBufferedEventCount();
        }).sum();
    }

    public int getBufferedEventCount(String str) {
        EventHubBufferedPartitionProducer eventHubBufferedPartitionProducer = this.partitionProducers.get(str);
        if (eventHubBufferedPartitionProducer != null) {
            return eventHubBufferedPartitionProducer.getBufferedEventCount();
        }
        return 0;
    }

    public Mono<Integer> enqueueEvent(EventData eventData) {
        return enqueueEvent(eventData, ROUND_ROBIN_SEND_OPTIONS);
    }

    public Mono<Integer> enqueueEvent(EventData eventData, SendOptions sendOptions) {
        return eventData == null ? FluxUtil.monoError(this.logger, new NullPointerException("'eventData' cannot be null.")) : sendOptions == null ? FluxUtil.monoError(this.logger, new NullPointerException("'options' cannot be null.")) : !CoreUtils.isNullOrEmpty(sendOptions.getPartitionId()) ? !this.partitionProducers.containsKey(sendOptions.getPartitionId()) ? FluxUtil.monoError(this.logger, new IllegalArgumentException("partitionId is not valid. Available ones: " + String.join(",", this.partitionProducers.keySet()))) : this.partitionProducers.computeIfAbsent(sendOptions.getPartitionId(), str -> {
            return createPartitionProducer(str);
        }).enqueueEvent(eventData).thenReturn(Integer.valueOf(getBufferedEventCount())) : sendOptions.getPartitionKey() != null ? this.partitionIdsMono.flatMap(strArr -> {
            String assignForPartitionKey = this.partitionResolver.assignForPartitionKey(sendOptions.getPartitionKey(), strArr);
            EventHubBufferedPartitionProducer eventHubBufferedPartitionProducer = this.partitionProducers.get(assignForPartitionKey);
            return eventHubBufferedPartitionProducer == null ? FluxUtil.monoError(this.logger, new IllegalArgumentException(String.format("Unable to find EventHubBufferedPartitionProducer for partitionId: %s when mapping partitionKey: %s to available partitions.", assignForPartitionKey, sendOptions.getPartitionKey()))) : eventHubBufferedPartitionProducer.enqueueEvent(eventData).thenReturn(Integer.valueOf(getBufferedEventCount()));
        }) : this.partitionIdsMono.flatMap(strArr2 -> {
            return this.partitionProducers.computeIfAbsent(this.partitionResolver.assignRoundRobin(strArr2), str2 -> {
                return createPartitionProducer(str2);
            }).enqueueEvent(eventData).thenReturn(Integer.valueOf(getBufferedEventCount()));
        });
    }

    public Mono<Integer> enqueueEvents(Iterable<EventData> iterable) {
        return enqueueEvents(iterable, ROUND_ROBIN_SEND_OPTIONS);
    }

    public Mono<Integer> enqueueEvents(Iterable<EventData> iterable, SendOptions sendOptions) {
        return iterable == null ? FluxUtil.monoError(this.logger, new NullPointerException("'eventData' cannot be null.")) : sendOptions == null ? FluxUtil.monoError(this.logger, new NullPointerException("'options' cannot be null.")) : Flux.concat((List) StreamSupport.stream(iterable.spliterator(), false).map(eventData -> {
            return enqueueEvent(eventData, sendOptions);
        }).collect(Collectors.toList())).last();
    }

    public Mono<Void> flush() {
        return Flux.merge((List) this.partitionProducers.values().stream().map(eventHubBufferedPartitionProducer -> {
            return eventHubBufferedPartitionProducer.flush();
        }).collect(Collectors.toList())).then();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.isClosed.getAndSet(true)) {
            return;
        }
        this.partitionProducers.values().forEach(eventHubBufferedPartitionProducer -> {
            eventHubBufferedPartitionProducer.close();
        });
        this.client.close();
    }

    private EventHubBufferedPartitionProducer createPartitionProducer(String str) {
        Queue queue = (Queue) Queues.get(this.clientOptions.getMaxEventBufferLengthPerPartition()).get();
        return new EventHubBufferedPartitionProducer(this.client, str, this.clientOptions, this.retryOptions, Sinks.many().unicast().onBackpressureBuffer(queue), queue, this.tracer);
    }
}
