package io.vertx.kafka.client.producer.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.WriteStream;
import io.vertx.kafka.client.common.KafkaClientOptions;
import io.vertx.kafka.client.common.PartitionInfo;
import io.vertx.kafka.client.common.impl.CloseHandler;
import io.vertx.kafka.client.common.impl.Helper;
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
import io.vertx.kafka.client.producer.KafkaWriteStream;
import io.vertx.kafka.client.producer.RecordMetadata;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.Serializer;

/* loaded from: input_file:io/vertx/kafka/client/producer/impl/KafkaProducerImpl.class */
public class KafkaProducerImpl<K, V> implements KafkaProducer<K, V> {
    private static final Map<String, SharedProducer> sharedProducers = new HashMap();
    private final Vertx vertx;
    private final KafkaWriteStream<K, V> stream;
    private final CloseHandler closeHandler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:io/vertx/kafka/client/producer/impl/KafkaProducerImpl$SharedProducer.class */
    public static class SharedProducer extends HashMap<Object, KafkaProducer> {
        final Producer producer;
        final CloseHandler closeHandler;

        public SharedProducer(KafkaWriteStream kafkaWriteStream) {
            this.producer = kafkaWriteStream.unwrap();
            kafkaWriteStream.getClass();
            this.closeHandler = new CloseHandler((v1, v2) -> {
                r3.close(v1, v2);
            });
        }
    }

    public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String str, Properties properties) {
        return createShared(vertx, str, (Supplier<KafkaWriteStream>) () -> {
            return KafkaWriteStream.create(vertx, properties);
        });
    }

    public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String str, Map<String, String> map) {
        return createShared(vertx, str, (Supplier<KafkaWriteStream>) () -> {
            return KafkaWriteStream.create(vertx, new HashMap(map));
        });
    }

    public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String str, KafkaClientOptions kafkaClientOptions) {
        return createShared(vertx, str, (Supplier<KafkaWriteStream>) () -> {
            return KafkaWriteStream.create(vertx, kafkaClientOptions);
        });
    }

    public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String str, Properties properties, Class<K> cls, Class<V> cls2) {
        return createShared(vertx, str, (Supplier<KafkaWriteStream>) () -> {
            return KafkaWriteStream.create(vertx, properties, cls, cls2);
        });
    }

    public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String str, Properties properties, Serializer<K> serializer, Serializer<V> serializer2) {
        return createShared(vertx, str, (Supplier<KafkaWriteStream>) () -> {
            return KafkaWriteStream.create(vertx, properties, serializer, serializer2);
        });
    }

    public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String str, Map<String, String> map, Class<K> cls, Class<V> cls2) {
        return createShared(vertx, str, (Supplier<KafkaWriteStream>) () -> {
            return KafkaWriteStream.create(vertx, new HashMap(map), cls, cls2);
        });
    }

    public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String str, Map<String, String> map, Serializer<K> serializer, Serializer<V> serializer2) {
        return createShared(vertx, str, (Supplier<KafkaWriteStream>) () -> {
            return KafkaWriteStream.create(vertx, new HashMap(map), serializer, serializer2);
        });
    }

    public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String str, KafkaClientOptions kafkaClientOptions, Class<K> cls, Class<V> cls2) {
        return createShared(vertx, str, (Supplier<KafkaWriteStream>) () -> {
            return KafkaWriteStream.create(vertx, kafkaClientOptions, cls, cls2);
        });
    }

    public static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String str, KafkaClientOptions kafkaClientOptions, Serializer<K> serializer, Serializer<V> serializer2) {
        return createShared(vertx, str, (Supplier<KafkaWriteStream>) () -> {
            return KafkaWriteStream.create(vertx, kafkaClientOptions, serializer, serializer2);
        });
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <K, V> KafkaProducer<K, V> createShared(Vertx vertx, String str, Supplier<KafkaWriteStream> supplier) {
        KafkaProducerImpl<K, V> registerCloseHook;
        synchronized (sharedProducers) {
            SharedProducer computeIfAbsent = sharedProducers.computeIfAbsent(str, str2 -> {
                SharedProducer sharedProducer = new SharedProducer((KafkaWriteStream) supplier.get());
                sharedProducer.closeHandler.registerCloseHook((VertxInternal) vertx);
                return sharedProducer;
            });
            Object obj = new Object();
            KafkaProducerImpl kafkaProducerImpl = new KafkaProducerImpl(vertx, KafkaWriteStream.create(vertx, computeIfAbsent.producer), new CloseHandler((l, handler) -> {
                synchronized (sharedProducers) {
                    computeIfAbsent.remove(obj);
                    if (!computeIfAbsent.isEmpty()) {
                        handler.handle(Future.succeededFuture());
                    } else {
                        sharedProducers.remove(str);
                        computeIfAbsent.closeHandler.close(l.longValue(), handler);
                    }
                }
            }));
            computeIfAbsent.put(obj, kafkaProducerImpl);
            registerCloseHook = kafkaProducerImpl.registerCloseHook();
        }
        return registerCloseHook;
    }

    public KafkaProducerImpl(Vertx vertx, KafkaWriteStream<K, V> kafkaWriteStream, CloseHandler closeHandler) {
        this.vertx = vertx;
        this.stream = kafkaWriteStream;
        this.closeHandler = closeHandler;
    }

    /* JADX WARN: 'this' call moved to the top of the method (can break code semantics) */
    public KafkaProducerImpl(Vertx vertx, KafkaWriteStream<K, V> kafkaWriteStream) {
        this(vertx, kafkaWriteStream, new CloseHandler((v1, v2) -> {
            r5.close(v1, v2);
        }));
        kafkaWriteStream.getClass();
    }

    public KafkaProducerImpl<K, V> registerCloseHook() {
        ContextInternal currentContext = Vertx.currentContext();
        if (currentContext == null) {
            return this;
        }
        this.closeHandler.registerCloseHook(currentContext);
        return this;
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public KafkaProducer<K, V> initTransactions(Handler<AsyncResult<Void>> handler) {
        this.stream.initTransactions(handler);
        return this;
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public Future<Void> initTransactions() {
        return this.stream.initTransactions();
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public KafkaProducer<K, V> beginTransaction(Handler<AsyncResult<Void>> handler) {
        this.stream.beginTransaction(handler);
        return this;
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public Future<Void> beginTransaction() {
        return this.stream.beginTransaction();
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public KafkaProducer<K, V> commitTransaction(Handler<AsyncResult<Void>> handler) {
        this.stream.commitTransaction(handler);
        return this;
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public Future<Void> commitTransaction() {
        return this.stream.commitTransaction();
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public KafkaProducer<K, V> abortTransaction(Handler<AsyncResult<Void>> handler) {
        this.stream.abortTransaction(handler);
        return this;
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public Future<Void> abortTransaction() {
        return this.stream.abortTransaction();
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public KafkaProducer<K, V> exceptionHandler(Handler<Throwable> handler) {
        this.stream.exceptionHandler(handler);
        return this;
    }

    public Future<Void> write(KafkaProducerRecord<K, V> kafkaProducerRecord) {
        return this.stream.write(kafkaProducerRecord.record());
    }

    public void write(KafkaProducerRecord<K, V> kafkaProducerRecord, Handler<AsyncResult<Void>> handler) {
        this.stream.write(kafkaProducerRecord.record(), handler);
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public Future<RecordMetadata> send(KafkaProducerRecord<K, V> kafkaProducerRecord) {
        return this.stream.send(kafkaProducerRecord.record()).map(Helper::from);
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public KafkaProducer<K, V> send(KafkaProducerRecord<K, V> kafkaProducerRecord, Handler<AsyncResult<RecordMetadata>> handler) {
        send(kafkaProducerRecord).onComplete(handler);
        return this;
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public Future<List<PartitionInfo>> partitionsFor(String str) {
        return this.stream.partitionsFor(str).map(list -> {
            return (List) list.stream().map(partitionInfo -> {
                return new PartitionInfo().setInSyncReplicas((List) Stream.of((Object[]) partitionInfo.inSyncReplicas()).map(Helper::from).collect(Collectors.toList())).setLeader(Helper.from(partitionInfo.leader())).setPartition(partitionInfo.partition()).setReplicas((List) Stream.of((Object[]) partitionInfo.replicas()).map(Helper::from).collect(Collectors.toList())).setTopic(partitionInfo.topic());
            }).collect(Collectors.toList());
        });
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public KafkaProducer<K, V> partitionsFor(String str, Handler<AsyncResult<List<PartitionInfo>>> handler) {
        partitionsFor(str).onComplete(handler);
        return this;
    }

    public void end(Handler<AsyncResult<Void>> handler) {
        this.stream.end(handler);
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    /* renamed from: setWriteQueueMaxSize */
    public KafkaProducer<K, V> mo40setWriteQueueMaxSize(int i) {
        this.stream.mo44setWriteQueueMaxSize(i);
        return this;
    }

    public boolean writeQueueFull() {
        return this.stream.writeQueueFull();
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public KafkaProducer<K, V> drainHandler(Handler<Void> handler) {
        this.stream.drainHandler(handler);
        return this;
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public KafkaProducer<K, V> flush(Handler<AsyncResult<Void>> handler) {
        this.stream.flush(handler);
        return this;
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public Future<Void> flush() {
        return this.stream.flush();
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public Future<Void> close(long j) {
        Handler<AsyncResult<Void>> promise = Promise.promise();
        this.closeHandler.close(j, promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public Future<Void> close() {
        Handler<AsyncResult<Void>> promise = Promise.promise();
        this.closeHandler.close(promise);
        return promise.future();
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public void close(Handler<AsyncResult<Void>> handler) {
        this.closeHandler.close(handler);
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public void close(long j, Handler<AsyncResult<Void>> handler) {
        this.closeHandler.close(j, handler);
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public KafkaWriteStream<K, V> asStream() {
        return this.stream;
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    public Producer<K, V> unwrap() {
        return this.stream.unwrap();
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    /* renamed from: drainHandler */
    public /* bridge */ /* synthetic */ WriteStream mo39drainHandler(Handler handler) {
        return drainHandler((Handler<Void>) handler);
    }

    public /* bridge */ /* synthetic */ void write(Object obj, Handler handler) {
        write((KafkaProducerRecord) obj, (Handler<AsyncResult<Void>>) handler);
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ WriteStream mo41exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    @Override // io.vertx.kafka.client.producer.KafkaProducer
    /* renamed from: exceptionHandler */
    public /* bridge */ /* synthetic */ StreamBase mo42exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
