package org.apache.hadoop.hive.llap;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.Message;
import com.google.protobuf.ServiceException;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.PrivilegedAction;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.net.SocketFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.ipc.AsyncCallLimitExceededException;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.concurrent.AsyncGet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hadoop/hive/llap/AsyncPbRpcProxy.class */
public abstract class AsyncPbRpcProxy<ProtocolType, TokenType extends TokenIdentifier> extends AbstractService {
    private static final Logger LOG = LoggerFactory.getLogger(AsyncPbRpcProxy.class);
    private final Cache<String, ProtocolType> hostProxies;
    private final RequestManager requestManager;
    private final RetryPolicy retryPolicy;
    private final SocketFactory socketFactory;
    private final ListeningExecutorService requestManagerExecutor;
    private volatile ListenableFuture<Void> requestManagerFuture;
    private final Token<TokenType> token;
    private final String tokenUser;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/AsyncPbRpcProxy$AsyncCallableRequest.class */
    public static abstract class AsyncCallableRequest<REQUEST extends Message, RESPONSE extends Message> extends NodeCallableRequest<REQUEST, RESPONSE> {
        private final long TIMEOUT = 60000;
        private final long BACKOFF_START = 10;
        private final int FAST_RETRIES = 5;
        private AsyncGet<Message, Exception> responseFuture;

        protected AsyncCallableRequest(LlapNodeId llapNodeId, REQUEST request, ExecuteRequestCallback<RESPONSE> executeRequestCallback) {
            super(llapNodeId, request, executeRequestCallback);
            this.TIMEOUT = 60000L;
            this.BACKOFF_START = 10L;
            this.FAST_RETRIES = 5;
        }

        @Override // org.apache.hadoop.hive.llap.AsyncPbRpcProxy.CallableRequest, java.util.concurrent.Callable
        public RESPONSE call() throws Exception {
            boolean isAsynchronousMode = Client.isAsynchronousMode();
            long currentTimeMillis = System.currentTimeMillis() + 60000;
            int i = 0;
            long j = 10;
            try {
                Client.setAsynchronousMode(true);
                boolean z = false;
                while (!z) {
                    try {
                        callInternal();
                        z = true;
                    } catch (Exception e) {
                        if (!(e instanceof ServiceException) || e.getCause() == null || !(e.getCause() instanceof AsyncCallLimitExceededException)) {
                            throw e;
                        }
                        i++;
                        if (i >= 5) {
                            Thread.sleep(j);
                            if (System.currentTimeMillis() > currentTimeMillis) {
                                throw new HiveException("Async request timed out in  60000 ms.", e.getCause());
                            }
                            i = 0;
                            j *= 2;
                        }
                        AsyncPbRpcProxy.LOG.trace("Async call limit exceeded", e);
                    }
                }
                this.responseFuture = ProtobufRpcEngine.getAsyncReturnMessage();
                Client.setAsynchronousMode(isAsynchronousMode);
                return null;
            } catch (Throwable th) {
                Client.setAsynchronousMode(isAsynchronousMode);
                throw th;
            }
        }

        public void callInternal() throws Exception {
        }

        public AsyncGet<Message, Exception> getResponseFuture() {
            return this.responseFuture;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/AsyncPbRpcProxy$AsyncResponseCallback.class */
    public static final class AsyncResponseCallback<TYPE extends Message> implements FutureCallback<TYPE> {
        private final ExecuteRequestCallback<TYPE> callback;
        private final LlapNodeId nodeId;
        private final RequestManager requestManager;
        private final AsyncCallableRequest request;
        private final AsyncResponseHandler asyncResponseHandler;

        public AsyncResponseCallback(ExecuteRequestCallback<TYPE> executeRequestCallback, LlapNodeId llapNodeId, RequestManager requestManager, AsyncCallableRequest asyncCallableRequest, AsyncResponseHandler asyncResponseHandler) {
            this.callback = executeRequestCallback;
            this.nodeId = llapNodeId;
            this.requestManager = requestManager;
            this.request = asyncCallableRequest;
            this.asyncResponseHandler = asyncResponseHandler;
        }

        public void onSuccess(TYPE type) {
            this.asyncResponseHandler.addToAsyncResponseFutureQueue(this.request);
        }

        public void onFailure(Throwable th) {
            try {
                this.callback.indicateError(th);
            } finally {
                this.requestManager.requestFinished(this.nodeId);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/hadoop/hive/llap/AsyncPbRpcProxy$CallableRequest.class */
    public static abstract class CallableRequest<REQUEST extends Message, RESPONSE extends Message> implements Callable<RESPONSE> {
        protected final ExecuteRequestCallback<RESPONSE> callback;
        protected final REQUEST request;

        protected CallableRequest(REQUEST request, ExecuteRequestCallback<RESPONSE> executeRequestCallback) {
            this.request = request;
            this.callback = executeRequestCallback;
        }

        public abstract LlapNodeId getNodeId() throws Exception;

        public ExecuteRequestCallback<RESPONSE> getCallback() {
            return this.callback;
        }

        @Override // java.util.concurrent.Callable
        public abstract RESPONSE call() throws Exception;
    }

    /* loaded from: input_file:org/apache/hadoop/hive/llap/AsyncPbRpcProxy$ExecuteRequestCallback.class */
    public interface ExecuteRequestCallback<T extends Message> {
        void setResponse(T t);

        void indicateError(Throwable th);
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/hadoop/hive/llap/AsyncPbRpcProxy$NodeCallableRequest.class */
    protected static abstract class NodeCallableRequest<REQUEST extends Message, RESPONSE extends Message> extends CallableRequest<REQUEST, RESPONSE> {
        protected final LlapNodeId nodeId;

        protected NodeCallableRequest(LlapNodeId llapNodeId, REQUEST request, ExecuteRequestCallback<RESPONSE> executeRequestCallback) {
            super(request, executeRequestCallback);
            this.nodeId = llapNodeId;
        }

        @Override // org.apache.hadoop.hive.llap.AsyncPbRpcProxy.CallableRequest
        public LlapNodeId getNodeId() {
            return this.nodeId;
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/hadoop/hive/llap/AsyncPbRpcProxy$RequestManager.class */
    public static class RequestManager implements Callable<Void> {
        private final int maxConcurrentRequestsPerNode;
        private final ListeningExecutorService executor;
        private final AsyncResponseHandler asyncResponseHandler;
        private final Lock lock = new ReentrantLock();
        private final AtomicBoolean isShutdown = new AtomicBoolean(false);
        private final Condition queueCondition = this.lock.newCondition();
        private final AtomicBoolean shouldRun = new AtomicBoolean(false);
        private final LinkedList<CallableRequest<?, ?>> newRequestList = new LinkedList<>();
        private final LinkedList<CallableRequest<?, ?>> pendingRequests = new LinkedList<>();
        private final ConcurrentMap<LlapNodeId, AtomicInteger> runningRequests = new ConcurrentHashMap();
        private final LinkedList<LlapNodeId> completedNodes = new LinkedList<>();

        @VisibleForTesting
        Set<LlapNodeId> currentLoopDisabledNodes = new HashSet();

        @VisibleForTesting
        List<CallableRequest<?, ?>> currentLoopSkippedRequests = new LinkedList();

        public RequestManager(int i, int i2) {
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i, new ThreadFactoryBuilder().setNameFormat("TaskCommunicator #%2d").build());
            this.maxConcurrentRequestsPerNode = i2;
            this.executor = MoreExecutors.listeningDecorator(newFixedThreadPool);
            this.asyncResponseHandler = new AsyncResponseHandler(this);
            this.asyncResponseHandler.start();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() {
            while (true) {
                if (this.isShutdown.get()) {
                    break;
                }
                this.lock.lock();
                try {
                    if (!this.shouldRun.get()) {
                        this.queueCondition.await();
                    }
                    if (process()) {
                        break;
                    }
                    this.lock.unlock();
                } catch (InterruptedException e) {
                    handleInterrupt(e);
                } finally {
                    this.lock.unlock();
                }
            }
            AsyncPbRpcProxy.LOG.info("CallScheduler loop exiting");
            return null;
        }

        private void handleInterrupt(InterruptedException interruptedException) {
            if (this.isShutdown.get()) {
                return;
            }
            AsyncPbRpcProxy.LOG.warn("RunLoop interrupted without being shutdown first");
            throw new RuntimeException(interruptedException);
        }

        public void queueRequest(CallableRequest<?, ?> callableRequest) {
            synchronized (this.newRequestList) {
                this.newRequestList.add(callableRequest);
                this.shouldRun.set(true);
            }
            notifyRunLoop();
        }

        public void requestFinished(LlapNodeId llapNodeId) {
            synchronized (this.completedNodes) {
                this.completedNodes.add(llapNodeId);
                this.shouldRun.set(true);
            }
            notifyRunLoop();
        }

        public void shutdown() {
            if (this.isShutdown.getAndSet(true)) {
                return;
            }
            this.asyncResponseHandler.shutdownNow();
            this.executor.shutdownNow();
            notifyRunLoop();
        }

        @VisibleForTesting
        <T extends Message, U extends Message> void submitToExecutor(CallableRequest<T, U> callableRequest, LlapNodeId llapNodeId) {
            ListenableFuture submit = this.executor.submit(callableRequest);
            if (callableRequest instanceof AsyncCallableRequest) {
                Futures.addCallback(submit, new AsyncResponseCallback(callableRequest.getCallback(), llapNodeId, this, (AsyncCallableRequest) callableRequest, this.asyncResponseHandler), MoreExecutors.directExecutor());
            } else {
                Futures.addCallback(submit, new ResponseCallback(callableRequest.getCallback(), llapNodeId, this), MoreExecutors.directExecutor());
            }
        }

        @VisibleForTesting
        boolean process() throws InterruptedException {
            if (this.isShutdown.get()) {
                return true;
            }
            this.currentLoopDisabledNodes.clear();
            this.currentLoopSkippedRequests.clear();
            this.shouldRun.compareAndSet(true, false);
            drainNewRequestList();
            drainCompletedNodes();
            Iterator<CallableRequest<?, ?>> it = this.pendingRequests.iterator();
            while (it.hasNext()) {
                CallableRequest<?, ?> next = it.next();
                it.remove();
                try {
                    LlapNodeId nodeId = next.getNodeId();
                    if (canRunForNode(nodeId, this.currentLoopDisabledNodes)) {
                        submitToExecutor(next, nodeId);
                    } else {
                        this.currentLoopDisabledNodes.add(nodeId);
                        this.currentLoopSkippedRequests.add(next);
                    }
                } catch (InterruptedException e) {
                    throw e;
                } catch (Exception e2) {
                    next.getCallback().indicateError(e2);
                }
            }
            this.pendingRequests.addAll(0, this.currentLoopSkippedRequests);
            return false;
        }

        private void drainNewRequestList() {
            synchronized (this.newRequestList) {
                if (!this.newRequestList.isEmpty()) {
                    this.pendingRequests.addAll(this.newRequestList);
                    this.newRequestList.clear();
                }
            }
        }

        private void drainCompletedNodes() {
            synchronized (this.completedNodes) {
                if (!this.completedNodes.isEmpty()) {
                    Iterator<LlapNodeId> it = this.completedNodes.iterator();
                    while (it.hasNext()) {
                        this.runningRequests.get(it.next()).decrementAndGet();
                    }
                }
                this.completedNodes.clear();
            }
        }

        private boolean canRunForNode(LlapNodeId llapNodeId, Set<LlapNodeId> set) {
            if (set.contains(llapNodeId)) {
                return false;
            }
            AtomicInteger atomicInteger = this.runningRequests.get(llapNodeId);
            if (atomicInteger == null) {
                AtomicInteger atomicInteger2 = new AtomicInteger(0);
                AtomicInteger putIfAbsent = this.runningRequests.putIfAbsent(llapNodeId, atomicInteger2);
                atomicInteger = putIfAbsent != null ? putIfAbsent : atomicInteger2;
            }
            if (atomicInteger.incrementAndGet() <= this.maxConcurrentRequestsPerNode) {
                return true;
            }
            atomicInteger.decrementAndGet();
            return false;
        }

        private void notifyRunLoop() {
            this.lock.lock();
            try {
                this.queueCondition.signal();
            } finally {
                this.lock.unlock();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/hadoop/hive/llap/AsyncPbRpcProxy$ResponseCallback.class */
    public static final class ResponseCallback<TYPE extends Message> implements FutureCallback<TYPE> {
        private final ExecuteRequestCallback<TYPE> callback;
        private final LlapNodeId nodeId;
        private final RequestManager requestManager;

        public ResponseCallback(ExecuteRequestCallback<TYPE> executeRequestCallback, LlapNodeId llapNodeId, RequestManager requestManager) {
            this.callback = executeRequestCallback;
            this.nodeId = llapNodeId;
            this.requestManager = requestManager;
        }

        public void onSuccess(TYPE type) {
            try {
                this.callback.setResponse(type);
            } finally {
                this.requestManager.requestFinished(this.nodeId);
            }
        }

        public void onFailure(Throwable th) {
            try {
                this.callback.indicateError(th);
            } finally {
                this.requestManager.requestFinished(this.nodeId);
            }
        }
    }

    public void serviceStart() {
        this.requestManagerFuture = this.requestManagerExecutor.submit(this.requestManager);
        Futures.addCallback(this.requestManagerFuture, new FutureCallback<Void>() { // from class: org.apache.hadoop.hive.llap.AsyncPbRpcProxy.1
            public void onSuccess(Void r4) {
                AsyncPbRpcProxy.LOG.info("RequestManager shutdown");
            }

            public void onFailure(Throwable th) {
                if (th instanceof CancellationException) {
                    return;
                }
                AsyncPbRpcProxy.LOG.warn("RequestManager shutdown with error", th);
            }
        }, MoreExecutors.directExecutor());
    }

    public void serviceStop() {
        if (this.requestManagerFuture != null) {
            this.requestManager.shutdown();
            this.requestManagerFuture.cancel(true);
        }
        this.requestManagerExecutor.shutdown();
    }

    protected final void queueRequest(CallableRequest<?, ?> callableRequest) {
        this.requestManager.queueRequest(callableRequest);
    }

    public AsyncPbRpcProxy(String str, int i, Configuration configuration, Token<TokenType> token, long j, long j2, int i2, int i3) {
        super(str);
        CacheBuilder removalListener = CacheBuilder.newBuilder().expireAfterAccess(1L, TimeUnit.HOURS).removalListener(new RemovalListener<String, ProtocolType>() { // from class: org.apache.hadoop.hive.llap.AsyncPbRpcProxy.2
            /* JADX WARN: Multi-variable type inference failed */
            public void onRemoval(RemovalNotification<String, ProtocolType> removalNotification) {
                if (removalNotification == null) {
                    return;
                }
                AsyncPbRpcProxy.this.shutdownProtocolImpl(removalNotification.getValue());
            }
        });
        if (i2 > 0) {
            removalListener.maximumSize(i2 * 2);
        }
        this.hostProxies = removalListener.build();
        this.socketFactory = NetUtils.getDefaultSocketFactory(configuration);
        this.token = token;
        if (token != null) {
            String tokenUser = getTokenUser(token);
            if (tokenUser == null) {
                try {
                    tokenUser = UserGroupInformation.getCurrentUser().getShortUserName();
                    LOG.warn("Cannot determine token user from the token; using {}", tokenUser);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            this.tokenUser = tokenUser;
        } else {
            this.tokenUser = null;
        }
        this.retryPolicy = RetryPolicies.retryUpToMaximumTimeWithFixedSleep(j, j2, TimeUnit.MILLISECONDS);
        this.requestManager = new RequestManager(i, i3);
        this.requestManagerExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat("RequestManagerExecutor").build()));
        LOG.info("Setting up AsyncPbRpcProxy withnumThreads=" + i + "retryTime(millis)=" + j + "retrySleep(millis)=" + j2);
    }

    protected final ProtocolType getProxy(final LlapNodeId llapNodeId, final Token<TokenType> token) {
        String hostIdentifier = getHostIdentifier(llapNodeId.getHostname(), llapNodeId.getPort());
        LOG.debug("Getting host proxies for {}", hostIdentifier);
        try {
            return (ProtocolType) this.hostProxies.get(hostIdentifier, new Callable<ProtocolType>() { // from class: org.apache.hadoop.hive.llap.AsyncPbRpcProxy.3
                @Override // java.util.concurrent.Callable
                public ProtocolType call() throws Exception {
                    return (ProtocolType) AsyncPbRpcProxy.this.createProxy(llapNodeId, token);
                }
            });
        } catch (ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ProtocolType createProxy(final LlapNodeId llapNodeId, Token<TokenType> token) throws IOException {
        if (token == null && this.token == null) {
            LOG.debug("Creating a client without a token for {}", llapNodeId);
            return createProtocolImpl(getConfig(), llapNodeId.getHostname(), llapNodeId.getPort(), null, this.retryPolicy, this.socketFactory);
        }
        if (this.token != null && this.tokenUser == null) {
            throw new AssertionError("Invalid internal state from " + this.token);
        }
        String tokenUser = this.tokenUser == null ? getTokenUser(token) : this.tokenUser;
        if (tokenUser == null) {
            tokenUser = UserGroupInformation.getCurrentUser().getShortUserName();
            LOG.warn("Cannot determine token user for UGI; using {}", tokenUser);
        }
        final UserGroupInformation createRemoteUser = UserGroupInformation.createRemoteUser(tokenUser);
        if (token == null) {
            token = new Token<>(this.token);
        }
        SecurityUtil.setTokenService(token, NetUtils.createSocketAddrForHost(llapNodeId.getHostname(), llapNodeId.getPort()));
        createRemoteUser.addToken(token);
        LOG.debug("Creating a client for {}; the token is {}", llapNodeId, token);
        return (ProtocolType) createRemoteUser.doAs(new PrivilegedAction<ProtocolType>() { // from class: org.apache.hadoop.hive.llap.AsyncPbRpcProxy.4
            @Override // java.security.PrivilegedAction
            public ProtocolType run() {
                return (ProtocolType) AsyncPbRpcProxy.this.createProtocolImpl(AsyncPbRpcProxy.this.getConfig(), llapNodeId.getHostname(), llapNodeId.getPort(), createRemoteUser, AsyncPbRpcProxy.this.retryPolicy, AsyncPbRpcProxy.this.socketFactory);
            }
        });
    }

    private String getHostIdentifier(String str, int i) {
        StringBuilder sb = new StringBuilder();
        try {
            sb.append(InetAddress.getByName(str).getHostAddress()).append(":");
        } catch (UnknownHostException e) {
            LOG.warn("Unable to determine IP address for host: {}.. Ignoring..", str, e);
        }
        sb.append(str).append(":").append(i);
        return sb.toString();
    }

    protected abstract ProtocolType createProtocolImpl(Configuration configuration, String str, int i, UserGroupInformation userGroupInformation, RetryPolicy retryPolicy, SocketFactory socketFactory);

    protected abstract void shutdownProtocolImpl(ProtocolType protocoltype);

    protected abstract String getTokenUser(Token<TokenType> token);
}
