/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.messaging;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.util.concurrent.FutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.neo4j.causalclustering.messaging.monitoring.MessageQueueMonitor;
import org.neo4j.logging.Log;

class NonBlockingChannel {
    private static final int CONNECT_BACKOFF_IN_MS = 250;
    private static final int RETRY_DELAY_MS = 100;
    private final Thread messageSendingThread;
    private final Log log;
    private Channel nettyChannel;
    private Bootstrap bootstrap;
    private InetSocketAddress destination;
    private Queue<Object> messageQueue = new ConcurrentLinkedQueue<Object>();
    private volatile boolean stillRunning = true;
    private final MessageQueueMonitor monitor;
    private final int maxQueueSize;
    private FutureListener<Void> errorListener;

    NonBlockingChannel(Bootstrap bootstrap, InetSocketAddress destination, Log log, MessageQueueMonitor monitor, int maxQueueSize) {
        this.bootstrap = bootstrap;
        this.destination = destination;
        this.monitor = monitor;
        this.maxQueueSize = maxQueueSize;
        this.log = log;
        this.errorListener = future -> {
            if (!future.isSuccess()) {
                log.error("Failed to send message to " + destination, future.cause());
            }
        };
        this.messageSendingThread = new Thread(this::messageSendingThreadWork);
        this.messageSendingThread.start();
    }

    private void messageSendingThreadWork() {
        while (this.stillRunning) {
            block5: {
                try {
                    this.ensureConnected();
                    if (this.sendMessages()) {
                        this.nettyChannel.flush();
                    }
                }
                catch (IOException e) {
                    if (this.nettyChannel == null) break block5;
                    this.log.warn("Got exception for: " + this.nettyChannel + ". Will reconnect.", (Throwable)e);
                    this.nettyChannel.close();
                    this.nettyChannel = null;
                }
            }
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(100L));
        }
        if (this.nettyChannel != null) {
            this.nettyChannel.close();
            this.messageQueue.clear();
            this.monitor.queueSize(this.destination, this.messageQueue.size());
        }
    }

    public void dispose() {
        this.stillRunning = false;
        while (this.messageSendingThread.isAlive()) {
            this.messageSendingThread.interrupt();
            try {
                this.messageSendingThread.join(100L);
            }
            catch (InterruptedException interruptedException) {}
        }
    }

    public void send(Object msg) {
        if (!this.stillRunning) {
            throw new IllegalStateException("sending on disposed channel");
        }
        if (this.messageQueue.size() < this.maxQueueSize) {
            this.messageQueue.offer(msg);
            LockSupport.unpark(this.messageSendingThread);
            this.monitor.queueSize(this.destination, this.messageQueue.size());
        } else {
            this.monitor.droppedMessage(this.destination);
        }
    }

    private boolean sendMessages() throws IOException {
        Object message;
        if (this.nettyChannel == null) {
            return false;
        }
        boolean sentSomething = false;
        while ((message = this.messageQueue.peek()) != null) {
            ChannelFuture write = this.nettyChannel.write(message);
            write.addListener(this.errorListener);
            this.messageQueue.poll();
            this.monitor.queueSize(this.destination, this.messageQueue.size());
            sentSomething = true;
        }
        return sentSomething;
    }

    private void ensureConnected() throws IOException {
        if (this.nettyChannel != null && !this.nettyChannel.isOpen()) {
            this.nettyChannel = null;
        }
        while (this.nettyChannel == null && this.stillRunning) {
            ChannelFuture channelFuture = this.bootstrap.connect((SocketAddress)this.destination);
            Channel channel = channelFuture.awaitUninterruptibly().channel();
            if (channelFuture.isSuccess()) {
                channel.flush();
                this.nettyChannel = channel;
                this.log.info("Connected: " + this.nettyChannel);
                continue;
            }
            channel.close();
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(250L));
        }
    }
}

