package io.vertx.rabbitmq.impl;

import com.rabbitmq.client.BasicProperties;
import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.streams.ReadStream;
import io.vertx.core.streams.StreamBase;
import io.vertx.core.streams.impl.InboundBuffer;
import io.vertx.rabbitmq.RabbitMQClient;
import io.vertx.rabbitmq.RabbitMQPublisher;
import io.vertx.rabbitmq.RabbitMQPublisherConfirmation;
import io.vertx.rabbitmq.RabbitMQPublisherOptions;
import java.util.ArrayDeque;
import java.util.Deque;

/* loaded from: input_file:io/vertx/rabbitmq/impl/RabbitMQPublisherImpl.class */
public class RabbitMQPublisherImpl implements RabbitMQPublisher, ReadStream<RabbitMQPublisherConfirmation> {
    private static final Logger log = LoggerFactory.getLogger(RabbitMQPublisherImpl.class);
    private final RabbitMQClient client;
    private final InboundBuffer<RabbitMQPublisherConfirmation> confirmations;
    private final Context context;
    private final RabbitMQPublisherOptions options;
    private final InboundBuffer<MessageDetails> sendQueue;
    private final Deque<MessageDetails> pendingAcks = new ArrayDeque();
    private long lastChannelInstance = 0;
    private volatile boolean stopped = false;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/vertx/rabbitmq/impl/RabbitMQPublisherImpl$MessageDetails.class */
    public static class MessageDetails {
        private final String exchange;
        private final String routingKey;
        private final BasicProperties properties;
        private final Buffer message;
        private final Handler<AsyncResult<Void>> publishHandler;
        private final Handler<AsyncResult<Long>> confirmHandler;
        private volatile long deliveryTag;

        MessageDetails(String str, String str2, BasicProperties basicProperties, Buffer buffer, Handler<AsyncResult<Void>> handler, Handler<AsyncResult<Long>> handler2) {
            this.exchange = str;
            this.routingKey = str2;
            this.properties = basicProperties;
            this.message = buffer;
            this.publishHandler = handler;
            this.confirmHandler = handler2;
        }

        public void setDeliveryTag(long j) {
            this.deliveryTag = j;
        }
    }

    public RabbitMQPublisherImpl(Vertx vertx, RabbitMQClient rabbitMQClient, RabbitMQPublisherOptions rabbitMQPublisherOptions) {
        this.client = rabbitMQClient;
        this.context = vertx.getOrCreateContext();
        this.confirmations = new InboundBuffer<>(this.context);
        this.sendQueue = new InboundBuffer<>(this.context);
        this.sendQueue.handler(messageDetails -> {
            handleMessageSend(messageDetails);
        });
        this.options = rabbitMQPublisherOptions;
        this.client.addConnectionEstablishedCallback(promise -> {
            addConfirmListener(rabbitMQClient, rabbitMQPublisherOptions, promise);
            if (rabbitMQClient instanceof RabbitMQClientImpl) {
                if (this.lastChannelInstance == 0) {
                    this.lastChannelInstance = ((RabbitMQClientImpl) rabbitMQClient).getChannelInstance();
                } else if (this.lastChannelInstance != ((RabbitMQClientImpl) rabbitMQClient).getChannelInstance()) {
                    this.pendingAcks.clear();
                    this.lastChannelInstance = ((RabbitMQClientImpl) rabbitMQClient).getChannelInstance();
                }
            }
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public void start(Handler<AsyncResult<Void>> handler) {
        startForPromise().future().onComplete(handler);
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public Future<Void> start() {
        return startForPromise().future();
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public void stop(Handler<AsyncResult<Void>> handler) {
        this.stopped = true;
        this.sendQueue.pause();
        if (this.sendQueue.isEmpty()) {
            handler.handle(Future.succeededFuture());
        } else {
            this.sendQueue.emptyHandler(r4 -> {
                handler.handle(Future.succeededFuture());
            });
        }
        this.sendQueue.resume();
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public Future<Void> stop() {
        Promise promise = Promise.promise();
        stop(promise);
        return promise.future();
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public void restart() {
        this.stopped = false;
        this.sendQueue.pause();
        this.sendQueue.emptyHandler((Handler) null);
        this.sendQueue.resume();
    }

    private Promise<Void> startForPromise() {
        Promise<Void> promise = Promise.promise();
        addConfirmListener(this.client, this.options, promise);
        return promise;
    }

    protected final void addConfirmListener(RabbitMQClient rabbitMQClient, RabbitMQPublisherOptions rabbitMQPublisherOptions, Promise<Void> promise) {
        this.context.runOnContext(r8 -> {
            rabbitMQClient.addConfirmListener(rabbitMQPublisherOptions.getMaxInternalQueueSize()).onComplete(asyncResult -> {
                if (asyncResult.succeeded()) {
                    ((ReadStream) asyncResult.result()).handler(rabbitMQConfirmation -> {
                        handleConfirmation(rabbitMQConfirmation);
                    });
                    promise.complete();
                } else {
                    log.error("Failed to add confirmListener: ", asyncResult.cause());
                    promise.fail(asyncResult.cause());
                }
            });
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public ReadStream<RabbitMQPublisherConfirmation> getConfirmationStream() {
        return this;
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public int queueSize() {
        return this.sendQueue.size();
    }

    private void handleMessageSend(MessageDetails messageDetails) {
        this.sendQueue.pause();
        synchronized (this.pendingAcks) {
            this.pendingAcks.add(messageDetails);
        }
        doSend(messageDetails);
    }

    private void doSend(MessageDetails messageDetails) {
        try {
            this.client.basicPublishWithDeliveryTag(messageDetails.exchange, messageDetails.routingKey, messageDetails.properties, messageDetails.message, l -> {
                messageDetails.setDeliveryTag(l.longValue());
            }, asyncResult -> {
                if (asyncResult.succeeded()) {
                    if (messageDetails.publishHandler != null) {
                        try {
                            messageDetails.publishHandler.handle(asyncResult);
                        } catch (Throwable th) {
                            log.warn("Failed to handle publish result", th);
                        }
                    }
                    this.sendQueue.resume();
                    return;
                }
                log.info("Failed to publish message: " + asyncResult.cause().toString());
                synchronized (this.pendingAcks) {
                    this.pendingAcks.remove(messageDetails);
                }
                this.client.restartConnect(0, asyncResult -> {
                    doSend(messageDetails);
                });
            });
        } catch (Throwable th) {
            synchronized (this.pendingAcks) {
                this.pendingAcks.remove(messageDetails);
                this.client.restartConnect(0, asyncResult2 -> {
                    doSend(messageDetails);
                });
            }
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:47:0x00e3, code lost:
    
        if (r0.properties != null) goto L34;
     */
    /* JADX WARN: Code restructure failed: missing block: B:48:0x00e6, code lost:
    
        r0 = null;
     */
    /* JADX WARN: Code restructure failed: missing block: B:49:0x00f4, code lost:
    
        r8.confirmations.write(new io.vertx.rabbitmq.RabbitMQPublisherConfirmation(r0, r9.getDeliveryTag(), r9.isSucceeded()));
     */
    /* JADX WARN: Code restructure failed: missing block: B:50:0x0114, code lost:
    
        if (r0.confirmHandler == null) goto L44;
     */
    /* JADX WARN: Code restructure failed: missing block: B:53:0x0117, code lost:
    
        r0 = r0.confirmHandler;
     */
    /* JADX WARN: Code restructure failed: missing block: B:54:0x0120, code lost:
    
        if (r9.isSucceeded() == false) goto L40;
     */
    /* JADX WARN: Code restructure failed: missing block: B:55:0x0123, code lost:
    
        r1 = io.vertx.core.Future.succeededFuture(java.lang.Long.valueOf(r0.deliveryTag));
     */
    /* JADX WARN: Code restructure failed: missing block: B:56:0x0136, code lost:
    
        r0.handle(r1);
     */
    /* JADX WARN: Code restructure failed: missing block: B:58:0x0131, code lost:
    
        r1 = io.vertx.core.Future.failedFuture("Message publish nacked by the broker");
     */
    /* JADX WARN: Code restructure failed: missing block: B:59:0x013e, code lost:
    
        r14 = move-exception;
     */
    /* JADX WARN: Code restructure failed: missing block: B:60:0x0140, code lost:
    
        io.vertx.rabbitmq.impl.RabbitMQPublisherImpl.log.warn("Failed to handle publish confirm", r14);
     */
    /* JADX WARN: Code restructure failed: missing block: B:61:0x00ea, code lost:
    
        r0 = r0.properties.getMessageId();
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void handleConfirmation(io.vertx.rabbitmq.RabbitMQConfirmation r9) {
        /*
            Method dump skipped, instructions count: 357
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: io.vertx.rabbitmq.impl.RabbitMQPublisherImpl.handleConfirmation(io.vertx.rabbitmq.RabbitMQConfirmation):void");
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public void publish(String str, String str2, BasicProperties basicProperties, Buffer buffer, Handler<AsyncResult<Void>> handler) {
        if (this.stopped) {
            return;
        }
        this.context.runOnContext(r16 -> {
            this.sendQueue.write(new MessageDetails(str, str2, basicProperties, buffer, handler, asyncResult -> {
            }));
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public Future<Void> publish(String str, String str2, BasicProperties basicProperties, Buffer buffer) {
        Promise promise = Promise.promise();
        if (!this.stopped) {
            this.context.runOnContext(r16 -> {
                this.sendQueue.write(new MessageDetails(str, str2, basicProperties, buffer, promise, null));
            });
        }
        return promise.future();
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public void publishConfirm(String str, String str2, BasicProperties basicProperties, Buffer buffer, Handler<AsyncResult<Long>> handler) {
        if (this.stopped) {
            return;
        }
        this.context.runOnContext(r16 -> {
            this.sendQueue.write(new MessageDetails(str, str2, basicProperties, buffer, null, handler));
        });
    }

    @Override // io.vertx.rabbitmq.RabbitMQPublisher
    public Future<Long> publishConfirm(String str, String str2, BasicProperties basicProperties, Buffer buffer) {
        return Future.future(promise -> {
            publishConfirm(str, str2, basicProperties, buffer, promise);
        });
    }

    public RabbitMQPublisherImpl exceptionHandler(Handler<Throwable> handler) {
        this.confirmations.exceptionHandler(handler);
        return this;
    }

    public RabbitMQPublisherImpl handler(Handler<RabbitMQPublisherConfirmation> handler) {
        this.confirmations.handler(handler);
        return this;
    }

    /* renamed from: pause, reason: merged with bridge method [inline-methods] */
    public RabbitMQPublisherImpl m55pause() {
        this.confirmations.pause();
        return this;
    }

    /* renamed from: resume, reason: merged with bridge method [inline-methods] */
    public RabbitMQPublisherImpl m54resume() {
        this.confirmations.resume();
        return this;
    }

    /* renamed from: fetch, reason: merged with bridge method [inline-methods] */
    public RabbitMQPublisherImpl m53fetch(long j) {
        this.confirmations.fetch(j);
        return this;
    }

    public RabbitMQPublisherImpl endHandler(Handler<Void> handler) {
        return this;
    }

    /* renamed from: endHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m52endHandler(Handler handler) {
        return endHandler((Handler<Void>) handler);
    }

    /* renamed from: handler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m56handler(Handler handler) {
        return handler((Handler<RabbitMQPublisherConfirmation>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ ReadStream m57exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }

    /* renamed from: exceptionHandler, reason: collision with other method in class */
    public /* bridge */ /* synthetic */ StreamBase m58exceptionHandler(Handler handler) {
        return exceptionHandler((Handler<Throwable>) handler);
    }
}
