/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.protocol.common.connector.connection;

import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoop;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.neo4j.bolt.fsm.StateMachine;
import org.neo4j.bolt.negotiation.message.ProtocolCapability;
import org.neo4j.bolt.protocol.common.BoltProtocol;
import org.neo4j.bolt.protocol.common.connector.Connector;
import org.neo4j.bolt.protocol.common.connector.connection.Connection;
import org.neo4j.bolt.protocol.common.connector.connection.ConnectionHandle;
import org.neo4j.bolt.protocol.common.connector.connection.Feature;
import org.neo4j.bolt.protocol.common.connector.connection.authentication.AuthenticationFlag;
import org.neo4j.bolt.protocol.common.connector.connection.listener.ConnectionListener;
import org.neo4j.bolt.protocol.common.fsm.response.NetworkResponseHandler;
import org.neo4j.bolt.protocol.common.fsm.response.ResponseHandler;
import org.neo4j.bolt.protocol.common.message.notifications.NotificationsConfig;
import org.neo4j.bolt.protocol.common.message.request.connection.RoutingContext;
import org.neo4j.bolt.protocol.io.pipeline.PipelineContext;
import org.neo4j.bolt.protocol.io.pipeline.WriterPipeline;
import org.neo4j.bolt.security.AuthenticationResult;
import org.neo4j.bolt.security.error.AuthenticationException;
import org.neo4j.dbms.admissioncontrol.AdmissionControlService;
import org.neo4j.internal.kernel.api.connectioninfo.ClientConnectionInfo;
import org.neo4j.internal.kernel.api.security.LoginContext;
import org.neo4j.kernel.impl.query.clientconnection.BoltConnectionInfo;
import org.neo4j.logging.InternalLog;
import org.neo4j.logging.Log;
import org.neo4j.logging.internal.LogService;
import org.neo4j.memory.MemoryTracker;
import org.neo4j.packstream.io.PackstreamBuf;
import org.neo4j.packstream.io.value.PackstreamValueReader;
import org.neo4j.packstream.struct.StructRegistry;
import org.neo4j.values.storable.Value;

public abstract class AbstractConnection
implements ConnectionHandle {
    private final Connector connector;
    protected final String id;
    protected final Channel channel;
    private final long connectedAt;
    protected final MemoryTracker memoryTracker;
    protected final LogService logService;
    protected final InternalLog log;
    protected final Log userLog;
    private final Lock listenerLock = new ReentrantLock();
    private final List<ConnectionListener> listeners = new CopyOnWriteArrayList<ConnectionListener>();
    protected final AtomicReference<BoltProtocol> protocol = new AtomicReference();
    protected final AtomicReference<Set<ProtocolCapability>> selectedCapabilities = new AtomicReference<EnumSet<ProtocolCapability>>(EnumSet.noneOf(ProtocolCapability.class));
    private final AtomicReference<Set<Feature>> features = new AtomicReference<Object>(null);
    protected final AdmissionControlService admissionControl;
    protected volatile StateMachine fsm;
    protected volatile WriterPipeline writerPipeline;
    protected final AtomicReference<StructRegistry<Connection, Value>> structRegistry = new AtomicReference();
    protected volatile ResponseHandler responseHandler;
    private final AtomicReference<LoginContext> loginContext = new AtomicReference();
    private volatile LoginContext impersonationContext;
    private volatile RoutingContext routingContext;
    private volatile BoltConnectionInfo connectionInfo;
    private volatile String username;
    private volatile String userAgent;
    private volatile Map<String, String> boltAgent;
    private String defaultDatabase;
    private String impersonatedDefaultDatabase;
    protected NotificationsConfig notificationsConfig;

    public AbstractConnection(Connector connector, String id, Channel channel, long connectedAt, MemoryTracker memoryTracker, LogService logService, AdmissionControlService admissionControl) {
        this.connector = connector;
        this.id = id;
        this.channel = channel;
        this.connectedAt = connectedAt;
        this.memoryTracker = memoryTracker;
        this.admissionControl = admissionControl;
        this.logService = logService;
        this.log = logService.getInternalLog(this.getClass());
        this.userLog = logService.getUserLog(this.getClass());
    }

    @Override
    public Connector connector() {
        return this.connector;
    }

    @Override
    public String connectorId() {
        return this.connector.id();
    }

    @Override
    public ByteBufAllocator allocator() {
        return this.channel.alloc();
    }

    @Override
    public void modifyPipeline(BiConsumer<Channel, ChannelPipeline> modifier) {
        Channel ch = this.channel;
        EventLoop eventLoop = ch.eventLoop();
        if (eventLoop.inEventLoop()) {
            modifier.accept(ch, ch.pipeline());
            return;
        }
        ch.eventLoop().execute(() -> modifier.accept(ch, ch.pipeline()));
    }

    @Override
    public ChannelFuture write(Object msg) {
        return this.channel.write(msg);
    }

    @Override
    public ChannelFuture write(Object msg, ChannelPromise promise) {
        return this.channel.write(msg, promise);
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg) {
        return this.channel.writeAndFlush(msg);
    }

    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        return this.channel.writeAndFlush(msg, promise);
    }

    @Override
    public void flush() {
        this.channel.flush();
    }

    public String id() {
        return this.id;
    }

    @Override
    public ClientConnectionInfo info() {
        BoltConnectionInfo connectionInfo = this.connectionInfo;
        if (connectionInfo == null) {
            throw new IllegalStateException("Connection " + this.id + " has yet to be authenticated");
        }
        return connectionInfo;
    }

    public long connectTime() {
        return this.connectedAt;
    }

    @Override
    public MemoryTracker memoryTracker() {
        return this.memoryTracker;
    }

    @Override
    public void registerListener(ConnectionListener listener) {
        this.listenerLock.lock();
        try {
            if (this.listeners.contains(listener)) {
                return;
            }
            this.listeners.add(listener);
            listener.onListenerAdded();
        }
        finally {
            this.listenerLock.unlock();
        }
    }

    @Override
    public void removeListener(ConnectionListener listener) {
        this.listenerLock.lock();
        try {
            this.listeners.remove(listener);
            listener.onListenerRemoved();
        }
        finally {
            this.listenerLock.unlock();
        }
    }

    @Override
    public void notifyListeners(Consumer<ConnectionListener> consumer) {
        this.listeners.forEach(consumer);
    }

    @Override
    public void notifyListenersSafely(String eventName, Consumer<ConnectionListener> notifierFunction) {
        this.listeners.forEach(listener -> {
            try {
                notifierFunction.accept((ConnectionListener)listener);
            }
            catch (Throwable ex) {
                this.log.error("[" + this.id + "] Failed to publish " + eventName + " event to listener " + listener.getClass().getSimpleName(), ex);
            }
        });
    }

    @Override
    public BoltProtocol protocol() {
        return this.protocol.get();
    }

    @Override
    public Set<ProtocolCapability> selectedCapabilities() {
        return Collections.unmodifiableSet(this.selectedCapabilities.get());
    }

    @Override
    public boolean hasSelectedCapability(ProtocolCapability capability) {
        return this.selectedCapabilities.get().contains((Object)capability);
    }

    @Override
    public void selectProtocol(BoltProtocol protocol, Set<ProtocolCapability> capabilities) {
        StateMachine fsm;
        Objects.requireNonNull(protocol, "protocol");
        if (!this.protocol.compareAndSet(null, protocol)) {
            throw new IllegalStateException("Protocol has already been selected for connection " + this.id);
        }
        this.selectedCapabilities.set(capabilities);
        WriterPipeline pipeline = new WriterPipeline(this);
        StructRegistry.Builder<Connection, Value> structRegistry = StructRegistry.builder();
        protocol.registerStructWriters(pipeline);
        protocol.registerStructReaders(structRegistry);
        this.writerPipeline = pipeline;
        this.structRegistry.set(structRegistry.build());
        this.responseHandler = new NetworkResponseHandler(this, this.protocol().metadataHandler(), this.connector.configuration().streamingBufferSize(), this.connector.configuration().streamingFlushThreshold(), this.logService);
        this.features.set(Collections.unmodifiableSet(protocol.features()));
        this.fsm = fsm = protocol.stateMachine().createInstance(this, this.logService, this.admissionControl);
        protocol.onConnectionNegotiated(this);
        this.notifyListeners(listener -> listener.onStateMachineInitialized(fsm));
    }

    private boolean enableFeature(Feature feature) {
        WriterPipeline pipeline;
        StructRegistry<Connection, Value> oldStructRegistry;
        Set<Feature> oldFeatures;
        if (this.protocol.get() == null) {
            throw new IllegalStateException("Connection has yet to select a protocol version");
        }
        HashSet<Feature> newFeatures = null;
        boolean enabled = false;
        do {
            if ((oldFeatures = this.features.get()) == null) continue;
            newFeatures = new HashSet<Feature>(oldFeatures);
            enabled = newFeatures.add(feature);
        } while (oldFeatures == null || !this.features.compareAndSet(oldFeatures, Collections.unmodifiableSet(newFeatures)));
        if (!enabled) {
            return false;
        }
        StructRegistry<Connection, Value> decoratedStructRegistry = null;
        do {
            if ((oldStructRegistry = this.structRegistry.get()) == null) continue;
            decoratedStructRegistry = feature.decorateStructRegistry(oldStructRegistry);
        } while (oldStructRegistry == null || !this.structRegistry.compareAndSet(oldStructRegistry, decoratedStructRegistry));
        while ((pipeline = this.writerPipeline) == null) {
        }
        feature.configureWriterPipeline(pipeline);
        return true;
    }

    @Override
    public List<Feature> negotiate(List<Feature> features, String userAgent, RoutingContext routingContext, NotificationsConfig notificationsConfig, Map<String, String> boltAgent) {
        this.userAgent = userAgent;
        this.routingContext = routingContext;
        this.notificationsConfig = notificationsConfig;
        this.boltAgent = boltAgent;
        return features.stream().filter(this::enableFeature).toList();
    }

    @Override
    public PipelineContext writerContext(PackstreamBuf buf) {
        WriterPipeline pipeline = this.writerPipeline;
        if (pipeline == null) {
            throw new IllegalStateException("Connection has yet to select a protocol version");
        }
        return pipeline.forBuffer(buf);
    }

    @Override
    public PackstreamValueReader<Connection> valueReader(PackstreamBuf buf) {
        StructRegistry<Connection, Value> structRegistry = this.structRegistry.get();
        if (structRegistry == null) {
            throw new IllegalStateException("Connection has yet to select a protocol version");
        }
        return new PackstreamValueReader<Connection>(this, buf, structRegistry);
    }

    @Override
    public StateMachine fsm() {
        StateMachine fsm = this.fsm;
        if (fsm == null) {
            throw new IllegalStateException("Connection has yet to select a protocol version");
        }
        return fsm;
    }

    @Override
    public LoginContext loginContext() {
        LoginContext impersonationContext = this.impersonationContext;
        if (impersonationContext != null) {
            return impersonationContext;
        }
        return this.loginContext.get();
    }

    @Override
    public RoutingContext routingContext() {
        if (this.routingContext == null) {
            throw new IllegalStateException("Connection has yet to select routing context");
        }
        return this.routingContext;
    }

    @Override
    public AuthenticationFlag logon(Map<String, Object> token) throws AuthenticationException {
        this.connectionInfo = new BoltConnectionInfo(this.id, this.userAgent, this.clientAddress(), this.serverAddress(), this.boltAgent);
        AuthenticationResult result = this.connector().authentication().authenticate(token, this.info());
        LoginContext loginContext = result.getLoginContext();
        if (!this.loginContext.compareAndSet(null, loginContext)) {
            throw new IllegalStateException("Cannot re-authenticate connection");
        }
        this.updateUser(loginContext.subject().authenticatedUser(), this.userAgent);
        this.log.debug("[%s] Authenticated with user '%s' (Credentials expired: %b)", new Object[]{this.id, loginContext.subject().authenticatedUser(), result.credentialsExpired()});
        this.resolveDefaultDatabase();
        this.notifyListeners(listener -> listener.onLogon(loginContext));
        if (result.credentialsExpired()) {
            return AuthenticationFlag.CREDENTIALS_EXPIRED;
        }
        return null;
    }

    @Override
    public void logoff() {
        if (!this.loginContext.compareAndSet(this.loginContext.get(), null)) {
            throw new IllegalStateException("Cannot logout as context is not what expected");
        }
        String username = this.username;
        this.username = null;
        this.notifyListeners(ConnectionListener::onLogoff);
        this.log.debug("[%s] Successfully logged off user %s and re-enabled throttles", new Object[]{this.id, username});
    }

    @Override
    public void impersonate(String userToImpersonate) throws AuthenticationException {
        Objects.requireNonNull(userToImpersonate, "userToImpersonate cannot be null");
        LoginContext loginContext = this.loginContext.get();
        if (loginContext == null) {
            throw new IllegalStateException("Cannot impersonate without prior authentication");
        }
        this.log.debug("[%s] Enabling impersonation of user '%s'", new Object[]{this.id, userToImpersonate});
        this.impersonationContext = this.connector.authentication().impersonate(loginContext, userToImpersonate);
        this.resolveDefaultDatabase();
        this.notifyListeners(listener -> listener.onUserImpersonated(this.impersonationContext));
    }

    @Override
    public void clearImpersonation() {
        if (this.impersonationContext == null) {
            return;
        }
        this.log.debug("[%s] Disabling impersonation", new Object[]{this.id});
        this.impersonationContext = null;
        String defaultDatabase = this.defaultDatabase;
        String impersonatedDefaultDatabase = this.impersonatedDefaultDatabase;
        this.impersonatedDefaultDatabase = null;
        if (!Objects.equals(impersonatedDefaultDatabase, defaultDatabase)) {
            this.notifyListeners(listener -> listener.onDefaultDatabaseSelected(defaultDatabase));
        }
        this.notifyListeners(ConnectionListener::onUserImpersonationCleared);
    }

    public SocketAddress serverAddress() {
        return this.channel.localAddress();
    }

    public SocketAddress clientAddress() {
        return this.channel.remoteAddress();
    }

    public String username() {
        return this.username;
    }

    public String userAgent() {
        return this.userAgent;
    }

    public Map<String, String> boltAgent() {
        return this.boltAgent;
    }

    public void updateUser(String username, String userAgent) {
        this.username = username;
    }

    @Override
    public String selectedDefaultDatabase() {
        String impersonatedDefaultDatabase = this.impersonatedDefaultDatabase;
        if (impersonatedDefaultDatabase != null) {
            return this.impersonatedDefaultDatabase;
        }
        return this.defaultDatabase;
    }

    @Override
    public void resolveDefaultDatabase() {
        String previousDatabase;
        LoginContext loginContext = this.loginContext();
        if (loginContext == null) {
            throw new IllegalStateException("Cannot resolve default database: Connection has not been authenticated");
        }
        String db = this.connector().defaultDatabaseResolver().defaultDatabase(this.loginContext().subject().executingUser());
        if (loginContext.impersonating()) {
            previousDatabase = this.impersonatedDefaultDatabase;
            this.impersonatedDefaultDatabase = db;
        } else {
            previousDatabase = this.defaultDatabase;
            this.impersonatedDefaultDatabase = null;
            this.defaultDatabase = db;
        }
        if (!Objects.equals(previousDatabase, db)) {
            this.notifyListeners(listener -> listener.onDefaultDatabaseSelected(db));
        }
    }

    public String toString() {
        return "AbstractConnection{connector=" + this.connector + ", channel=" + this.channel + ", connectedAt=" + this.connectedAt + ", protocol=" + this.protocol + ", fsm=" + this.fsm + ", loginContext=" + this.loginContext + ", impersonationContext=" + this.impersonationContext + ", connectionInfo=" + this.connectionInfo + ", boltAgent=" + this.boltAgent + ", username='" + this.username + "', userAgent='" + this.userAgent + "', defaultDatabase='" + this.defaultDatabase + "'}";
    }
}

