/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import io.opentelemetry.api.trace.Span;
import java.io.Closeable;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncAdminBuilder;
import org.apache.hadoop.hbase.client.AsyncAdminBuilderBase;
import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilder;
import org.apache.hadoop.hbase.client.AsyncBufferedMutatorBuilderImpl;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncConnectionConfiguration;
import org.apache.hadoop.hbase.client.AsyncHBaseAdmin;
import org.apache.hadoop.hbase.client.AsyncRegionLocator;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.AsyncTableBuilder;
import org.apache.hadoop.hbase.client.AsyncTableBuilderBase;
import org.apache.hadoop.hbase.client.AsyncTableImpl;
import org.apache.hadoop.hbase.client.AsyncTableRegionLocator;
import org.apache.hadoop.hbase.client.AsyncTableRegionLocatorImpl;
import org.apache.hadoop.hbase.client.ClusterStatusListener;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionOverAsyncConnection;
import org.apache.hadoop.hbase.client.ConnectionRegistry;
import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.client.HBaseHbck;
import org.apache.hadoop.hbase.client.Hbck;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.NonceGenerator;
import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
import org.apache.hadoop.hbase.client.RawAsyncHBaseAdmin;
import org.apache.hadoop.hbase.client.RawAsyncTableImpl;
import org.apache.hadoop.hbase.client.ScanResultConsumer;
import org.apache.hadoop.hbase.client.ServerStatisticTracker;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicyFactory;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.shaded.org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.ConcurrentMapUtils;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
class AsyncConnectionImpl
implements AsyncConnection {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
    static final HashedWheelTimer RETRY_TIMER = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("Async-Client-Retry-Timer-pool-%d").setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(), 10L, TimeUnit.MILLISECONDS);
    private final Configuration conf;
    final AsyncConnectionConfiguration connConf;
    private final User user;
    final ConnectionRegistry registry;
    private final int rpcTimeout;
    protected final RpcClient rpcClient;
    final RpcControllerFactory rpcControllerFactory;
    private final AsyncRegionLocator locator;
    final AsyncRpcRetryingCallerFactory callerFactory;
    private final NonceGenerator nonceGenerator;
    private final ConcurrentMap<String, ClientProtos.ClientService.Interface> rsStubs = new ConcurrentHashMap<String, ClientProtos.ClientService.Interface>();
    private final ConcurrentMap<String, AdminProtos.AdminService.Interface> adminStubs = new ConcurrentHashMap<String, AdminProtos.AdminService.Interface>();
    private final AtomicReference<MasterProtos.MasterService.Interface> masterStub = new AtomicReference();
    private final AtomicReference<CompletableFuture<MasterProtos.MasterService.Interface>> masterStubMakeFuture = new AtomicReference();
    private final Optional<ServerStatisticTracker> stats;
    private final ClientBackoffPolicy backoffPolicy;
    private ChoreService choreService;
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final Optional<MetricsConnection> metrics;
    private final ClusterStatusListener clusterStatusListener;
    private volatile ConnectionOverAsyncConnection conn;

    public AsyncConnectionImpl(Configuration conf, ConnectionRegistry registry, String clusterId, SocketAddress localAddress, User user) {
        this.conf = conf;
        this.user = user;
        if (user.isLoginFromKeytab()) {
            this.spawnRenewalChore(user.getUGI());
        }
        this.connConf = new AsyncConnectionConfiguration(conf);
        this.registry = registry;
        this.metrics = conf.getBoolean("hbase.client.metrics.enable", false) ? Optional.of(new MetricsConnection(this.toString(), () -> null, () -> null)) : Optional.empty();
        this.rpcClient = RpcClientFactory.createClient(conf, clusterId, localAddress, this.metrics.orElse(null));
        this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
        this.rpcTimeout = (int)Math.min(Integer.MAX_VALUE, TimeUnit.NANOSECONDS.toMillis(this.connConf.getRpcTimeoutNs()));
        this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
        this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
        this.nonceGenerator = conf.getBoolean("hbase.client.nonces.enabled", true) ? PerClientRandomNonceGenerator.get() : ConnectionUtils.NO_NONCE_GENERATOR;
        this.stats = Optional.ofNullable(ServerStatisticTracker.create(conf));
        this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
        ClusterStatusListener listener = null;
        if (conf.getBoolean("hbase.status.published", false)) {
            Class<ClusterStatusListener.Listener> listenerClass = conf.getClass("hbase.status.listener.class", ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS, ClusterStatusListener.Listener.class);
            if (listenerClass == null) {
                LOG.warn("{} is true, but {} is not set", (Object)"hbase.status.published", (Object)"hbase.status.listener.class");
            } else {
                try {
                    listener = new ClusterStatusListener(new ClusterStatusListener.DeadServerHandler(){

                        @Override
                        public void newDead(ServerName sn) {
                            AsyncConnectionImpl.this.locator.clearCache(sn);
                            AsyncConnectionImpl.this.rpcClient.cancelConnections(sn);
                        }
                    }, conf, listenerClass);
                }
                catch (IOException e) {
                    LOG.warn("Failed create of ClusterStatusListener, not a critical, ignoring...", (Throwable)e);
                }
            }
        }
        this.clusterStatusListener = listener;
    }

    private void spawnRenewalChore(UserGroupInformation user) {
        ChoreService service = this.getChoreService();
        service.scheduleChore(AuthUtil.getAuthRenewalChore(user));
    }

    synchronized ChoreService getChoreService() {
        if (this.isClosed()) {
            throw new IllegalStateException("connection is already closed");
        }
        if (this.choreService == null) {
            this.choreService = new ChoreService("AsyncConn Chore Service");
        }
        return this.choreService;
    }

    @Override
    public Configuration getConfiguration() {
        return this.conf;
    }

    @Override
    public boolean isClosed() {
        return this.closed.get();
    }

    @Override
    public void close() {
        TraceUtil.trace(() -> {
            if (!this.closed.compareAndSet(false, true)) {
                return;
            }
            LOG.info("Connection has been closed by {}.", (Object)Thread.currentThread().getName());
            if (LOG.isDebugEnabled()) {
                this.logCallStack(Thread.currentThread().getStackTrace());
            }
            IOUtils.closeQuietly((Closeable)this.clusterStatusListener, e -> LOG.warn("failed to close clusterStatusListener", (Throwable)e));
            IOUtils.closeQuietly((Closeable)this.rpcClient, e -> LOG.warn("failed to close rpcClient", (Throwable)e));
            IOUtils.closeQuietly((Closeable)this.registry, e -> LOG.warn("failed to close registry", (Throwable)e));
            AsyncConnectionImpl asyncConnectionImpl = this;
            synchronized (asyncConnectionImpl) {
                if (this.choreService != null) {
                    this.choreService.shutdown();
                    this.choreService = null;
                }
            }
            this.metrics.ifPresent(MetricsConnection::shutdown);
            ConnectionOverAsyncConnection c = this.conn;
            if (c != null) {
                c.closePool();
            }
        }, "AsyncConnection.close");
    }

    private void logCallStack(StackTraceElement[] stackTraceElements) {
        StringBuilder stackBuilder = new StringBuilder("Call stack:");
        for (StackTraceElement element : stackTraceElements) {
            stackBuilder.append("\n    at ");
            stackBuilder.append(element);
        }
        stackBuilder.append("\n");
        LOG.debug(stackBuilder.toString());
    }

    @Override
    public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
        return new AsyncTableRegionLocatorImpl(tableName, this);
    }

    @Override
    public void clearRegionLocationCache() {
        this.locator.clearCache();
    }

    AsyncRegionLocator getLocator() {
        return this.locator;
    }

    NonceGenerator getNonceGenerator() {
        return this.nonceGenerator;
    }

    private ClientProtos.ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
        return ClientProtos.ClientService.newStub(this.rpcClient.createRpcChannel(serverName, this.user, this.rpcTimeout));
    }

    ClientProtos.ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException {
        return ConcurrentMapUtils.computeIfAbsentEx(this.rsStubs, ConnectionUtils.getStubKey(ClientProtos.ClientService.getDescriptor().getName(), serverName), () -> this.createRegionServerStub(serverName));
    }

    private MasterProtos.MasterService.Interface createMasterStub(ServerName serverName) throws IOException {
        return MasterProtos.MasterService.newStub(this.rpcClient.createRpcChannel(serverName, this.user, this.rpcTimeout));
    }

    private AdminProtos.AdminService.Interface createAdminServerStub(ServerName serverName) throws IOException {
        return AdminProtos.AdminService.newStub(this.rpcClient.createRpcChannel(serverName, this.user, this.rpcTimeout));
    }

    AdminProtos.AdminService.Interface getAdminStub(ServerName serverName) throws IOException {
        return ConcurrentMapUtils.computeIfAbsentEx(this.adminStubs, ConnectionUtils.getStubKey(AdminProtos.AdminService.getDescriptor().getName(), serverName), () -> this.createAdminServerStub(serverName));
    }

    CompletableFuture<MasterProtos.MasterService.Interface> getMasterStub() {
        return ConnectionUtils.getOrFetch(this.masterStub, this.masterStubMakeFuture, false, () -> {
            CompletableFuture future = new CompletableFuture();
            FutureUtils.addListener(this.registry.getActiveMaster(), (addr, error) -> {
                if (error != null) {
                    future.completeExceptionally((Throwable)error);
                } else if (addr == null) {
                    future.completeExceptionally(new MasterNotRunningException("ZooKeeper available but no active master location found"));
                } else {
                    LOG.debug("The fetched master address is {}", addr);
                    try {
                        future.complete(this.createMasterStub((ServerName)addr));
                    }
                    catch (IOException e) {
                        future.completeExceptionally(e);
                    }
                }
            });
            return future;
        }, stub -> true, "master stub");
    }

    String getClusterId() {
        try {
            return this.registry.getClusterId().get();
        }
        catch (InterruptedException | ExecutionException e) {
            LOG.error("Error fetching cluster ID: ", (Throwable)e);
            return null;
        }
    }

    void clearMasterStubCache(MasterProtos.MasterService.Interface stub) {
        this.masterStub.compareAndSet(stub, null);
    }

    Optional<ServerStatisticTracker> getStatisticsTracker() {
        return this.stats;
    }

    ClientBackoffPolicy getBackoffPolicy() {
        return this.backoffPolicy;
    }

    @Override
    public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
        return new AsyncTableBuilderBase<AdvancedScanResultConsumer>(tableName, this.connConf){

            @Override
            public AsyncTable<AdvancedScanResultConsumer> build() {
                return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
            }
        };
    }

    @Override
    public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName, final ExecutorService pool) {
        return new AsyncTableBuilderBase<ScanResultConsumer>(tableName, this.connConf){

            @Override
            public AsyncTable<ScanResultConsumer> build() {
                RawAsyncTableImpl rawTable = new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
                return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
            }
        };
    }

    @Override
    public AsyncAdminBuilder getAdminBuilder() {
        return new AsyncAdminBuilderBase(this.connConf){

            @Override
            public AsyncAdmin build() {
                return new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
            }
        };
    }

    @Override
    public AsyncAdminBuilder getAdminBuilder(final ExecutorService pool) {
        return new AsyncAdminBuilderBase(this.connConf){

            @Override
            public AsyncAdmin build() {
                RawAsyncHBaseAdmin rawAdmin = new RawAsyncHBaseAdmin(AsyncConnectionImpl.this, RETRY_TIMER, this);
                return new AsyncHBaseAdmin(rawAdmin, pool);
            }
        };
    }

    @Override
    public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
        return new AsyncBufferedMutatorBuilderImpl(this.connConf, this.getTableBuilder(tableName), RETRY_TIMER);
    }

    @Override
    public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName, ExecutorService pool) {
        return new AsyncBufferedMutatorBuilderImpl(this.connConf, this.getTableBuilder(tableName, pool), RETRY_TIMER);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Connection toConnection() {
        ConnectionOverAsyncConnection c = this.conn;
        if (c != null) {
            return c;
        }
        AsyncConnectionImpl asyncConnectionImpl = this;
        synchronized (asyncConnectionImpl) {
            c = this.conn;
            if (c != null) {
                return c;
            }
            this.conn = c = new ConnectionOverAsyncConnection(this);
        }
        return c;
    }

    private Hbck getHbckInternal(ServerName masterServer) {
        Span.current().setAttribute(TraceUtil.SERVER_NAME_KEY, (Object)masterServer.getServerName());
        return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(this.rpcClient.createBlockingRpcChannel(masterServer, this.user, this.rpcTimeout)), this.rpcControllerFactory);
    }

    @Override
    public CompletableFuture<Hbck> getHbck() {
        return TraceUtil.tracedFuture(() -> {
            CompletableFuture future = new CompletableFuture();
            FutureUtils.addListener(this.registry.getActiveMaster(), (sn, error) -> {
                if (error != null) {
                    future.completeExceptionally((Throwable)error);
                } else {
                    future.complete(this.getHbckInternal((ServerName)sn));
                }
            });
            return future;
        }, "AsyncConnection.getHbck");
    }

    @Override
    public Hbck getHbck(final ServerName masterServer) {
        return TraceUtil.trace(new Supplier<Hbck>(){

            @Override
            public Hbck get() {
                return AsyncConnectionImpl.this.getHbckInternal(masterServer);
            }
        }, "AsyncConnection.getHbck");
    }

    Optional<MetricsConnection> getConnectionMetrics() {
        return this.metrics;
    }
}

