/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.runtime;

import io.netty.channel.Channel;
import java.net.SocketAddress;
import java.time.Clock;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.BoltServer;
import org.neo4j.bolt.messaging.BoltResponseMessageWriter;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.runtime.BoltConnectionAuthFatality;
import org.neo4j.bolt.runtime.BoltConnectionMetricsMonitor;
import org.neo4j.bolt.runtime.BoltProtocolBreachFatality;
import org.neo4j.bolt.runtime.Job;
import org.neo4j.bolt.runtime.Neo4jError;
import org.neo4j.bolt.runtime.scheduling.BoltConnectionLifetimeListener;
import org.neo4j.bolt.runtime.scheduling.BoltConnectionQueueMonitor;
import org.neo4j.bolt.runtime.statemachine.BoltStateMachine;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.logging.Log;
import org.neo4j.logging.internal.LogService;
import org.neo4j.util.FeatureToggles;

public class DefaultBoltConnection
implements BoltConnection {
    static final int DEFAULT_MAX_BATCH_SIZE = FeatureToggles.getInteger(BoltServer.class, (String)"max_batch_size", (int)100);
    private final String id;
    private final BoltChannel channel;
    private final BoltStateMachine machine;
    private final BoltConnectionLifetimeListener listener;
    private final BoltConnectionQueueMonitor queueMonitor;
    private final Log log;
    private final Log userLog;
    private final int maxBatchSize;
    private final List<Job> batch;
    private final LinkedBlockingQueue<Job> queue = new LinkedBlockingQueue();
    private final AtomicBoolean shouldClose = new AtomicBoolean();
    private final AtomicBoolean closed = new AtomicBoolean();
    private final AtomicBoolean idle = new AtomicBoolean(true);
    private final BoltConnectionMetricsMonitor metricsMonitor;
    private final Clock clock;
    private final BoltResponseMessageWriter messageWriter;

    DefaultBoltConnection(BoltChannel channel, BoltResponseMessageWriter messageWriter, BoltStateMachine machine, LogService logService, BoltConnectionLifetimeListener listener, BoltConnectionQueueMonitor queueMonitor, int maxBatchSize, BoltConnectionMetricsMonitor metricsMonitor, Clock clock) {
        this.id = channel.id();
        this.channel = channel;
        this.machine = machine;
        this.listener = listener;
        this.queueMonitor = queueMonitor;
        this.log = logService.getInternalLog(this.getClass());
        this.userLog = logService.getUserLog(this.getClass());
        this.maxBatchSize = maxBatchSize;
        this.batch = new ArrayList<Job>(maxBatchSize);
        this.metricsMonitor = metricsMonitor;
        this.clock = clock;
        this.messageWriter = messageWriter;
    }

    @Override
    public String id() {
        return this.id;
    }

    @Override
    public boolean idle() {
        return this.idle.get() && this.queue.isEmpty();
    }

    @Override
    public SocketAddress localAddress() {
        return this.channel.serverAddress();
    }

    @Override
    public SocketAddress remoteAddress() {
        return this.channel.clientAddress();
    }

    @Override
    public Channel channel() {
        return this.channel.rawChannel();
    }

    @Override
    public boolean hasPendingJobs() {
        return !this.queue.isEmpty();
    }

    @Override
    public void start() {
        this.notifyCreated();
        this.metricsMonitor.connectionOpened();
    }

    @Override
    public void enqueue(Job job) {
        this.metricsMonitor.messageReceived();
        long queuedAt = this.clock.millis();
        this.enqueueInternal(machine -> {
            long queueTime = this.clock.millis() - queuedAt;
            this.metricsMonitor.messageProcessingStarted(queueTime);
            try {
                job.perform(machine);
                this.metricsMonitor.messageProcessingCompleted(this.clock.millis() - queuedAt - queueTime);
            }
            catch (Throwable t) {
                this.metricsMonitor.messageProcessingFailed();
                throw t;
            }
        });
    }

    @Override
    public boolean processNextBatch() {
        return this.processNextBatch(this.maxBatchSize, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processNextBatch(int batchCount, boolean exitIfNoJobsAvailable) {
        this.idle.set(false);
        this.metricsMonitor.connectionActivated();
        try {
            boolean continueProcessing = this.processNextBatchInternal(batchCount, exitIfNoJobsAvailable);
            if (!continueProcessing) {
                this.metricsMonitor.connectionClosed();
            }
            boolean bl = continueProcessing;
            return bl;
        }
        finally {
            this.idle.set(true);
            this.metricsMonitor.connectionWaiting();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean processNextBatchInternal(int batchCount, boolean exitIfNoJobsAvailable) {
        try {
            boolean waitForMessage = false;
            boolean loop = false;
            while (!this.willClose()) {
                if (waitForMessage || !this.queue.isEmpty()) {
                    this.queue.drainTo(this.batch, batchCount);
                    if (this.batch.isEmpty() && !exitIfNoJobsAvailable) {
                        while (!this.willClose()) {
                            Job nextJob = this.queue.poll(10L, TimeUnit.SECONDS);
                            if (nextJob != null) {
                                this.batch.add(nextJob);
                                break;
                            }
                            this.machine.validateTransaction();
                        }
                    }
                    this.notifyDrained(this.batch);
                    while (!this.batch.isEmpty()) {
                        Job current = this.batch.remove(0);
                        current.perform(this.machine);
                    }
                    waitForMessage = loop = this.machine.shouldStickOnThread();
                }
                if (this.queue.isEmpty()) {
                    this.messageWriter.flush();
                }
                if (loop) continue;
            }
            assert (this.willClose() || !this.machine.hasOpenStatement());
        }
        catch (BoltConnectionAuthFatality ex) {
            this.shouldClose.set(true);
            if (ex.isLoggable()) {
                this.userLog.warn(ex.getMessage());
            }
        }
        catch (BoltProtocolBreachFatality ex) {
            this.shouldClose.set(true);
            this.log.error(String.format("Protocol breach detected in bolt session '%s'.", this.id()), (Throwable)ex);
        }
        catch (InterruptedException ex) {
            this.shouldClose.set(true);
            this.log.info("Bolt session '%s' is interrupted probably due to server shutdown.", new Object[]{this.id()});
        }
        catch (Throwable t) {
            this.shouldClose.set(true);
            this.userLog.error(String.format("Unexpected error detected in bolt session '%s'.", this.id()), t);
        }
        finally {
            if (this.willClose()) {
                this.close();
            }
        }
        return !this.closed.get();
    }

    @Override
    public void handleSchedulingError(Throwable t) {
        if (!this.willClose()) {
            String message;
            Neo4jError error;
            if (ExceptionUtils.hasCause((Throwable)t, RejectedExecutionException.class)) {
                error = Neo4jError.from((Status)Status.Request.NoThreadsAvailable, Status.Request.NoThreadsAvailable.code().description());
                message = String.format("Unable to schedule bolt session '%s' for execution since there are no available threads to serve it at the moment. You can retry at a later time or consider increasing max thread pool size for bolt connector(s).", this.id());
            } else {
                error = Neo4jError.fatalFrom(t);
                message = String.format("Unexpected error during scheduling of bolt session '%s'.", this.id());
            }
            this.log.error(message, t);
            this.userLog.error(message);
            this.machine.markFailed(error);
        }
        this.processNextBatch(1, true);
        this.close();
    }

    @Override
    public void interrupt() {
        this.machine.interrupt();
    }

    @Override
    public void stop() {
        if (this.shouldClose.compareAndSet(false, true)) {
            this.machine.markForTermination();
            this.enqueueInternal(ignore -> {});
        }
    }

    @Override
    public void keepAlive() {
        try {
            this.messageWriter.keepAlive();
        }
        catch (Throwable e) {
            this.log.error("Failed to perform keep alive check.", e);
            this.shouldClose.set(true);
        }
    }

    @Override
    public void initKeepAliveTimer() {
        this.messageWriter.initKeepAliveTimer();
    }

    private boolean willClose() {
        return this.shouldClose.get();
    }

    private void close() {
        if (this.closed.compareAndSet(false, true)) {
            try {
                this.messageWriter.close();
            }
            catch (Throwable t) {
                this.log.error(String.format("Unable to close pack output of bolt session '%s'.", this.id()), t);
            }
            try {
                this.machine.close();
            }
            catch (Throwable t) {
                this.log.error(String.format("Unable to close state machine of bolt session '%s'.", this.id()), t);
            }
            finally {
                this.notifyDestroyed();
            }
        }
    }

    private void enqueueInternal(Job job) {
        this.queue.offer(job);
        this.notifyEnqueued(job);
    }

    private void notifyCreated() {
        if (this.listener != null) {
            this.listener.created(this);
        }
    }

    private void notifyDestroyed() {
        if (this.listener != null) {
            this.listener.closed(this);
        }
    }

    private void notifyEnqueued(Job job) {
        if (this.queueMonitor != null) {
            this.queueMonitor.enqueued(this, job);
        }
    }

    private void notifyDrained(List<Job> jobs) {
        if (this.queueMonitor != null && !jobs.isEmpty()) {
            this.queueMonitor.drained(this, jobs);
        }
    }
}

