/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.client.api.async.impl;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.flink.shaded.hadoop2.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.shaded.hadoop2.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
import org.apache.hadoop.yarn.state.SingleArcTransition;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class NMClientAsyncImpl
extends NMClientAsync {
    private static final Log LOG = LogFactory.getLog(NMClientAsyncImpl.class);
    protected static final int INITIAL_THREAD_POOL_SIZE = 10;
    protected ThreadPoolExecutor threadPool;
    protected int maxThreadPoolSize;
    protected Thread eventDispatcherThread;
    protected AtomicBoolean stopped = new AtomicBoolean(false);
    protected BlockingQueue<ContainerEvent> events = new LinkedBlockingQueue<ContainerEvent>();
    protected ConcurrentMap<ContainerId, StatefulContainer> containers = new ConcurrentHashMap<ContainerId, StatefulContainer>();

    public NMClientAsyncImpl(NMClientAsync.CallbackHandler callbackHandler) {
        this(NMClientAsync.class.getName(), callbackHandler);
    }

    public NMClientAsyncImpl(String name, NMClientAsync.CallbackHandler callbackHandler) {
        this(name, new NMClientImpl(), callbackHandler);
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    protected NMClientAsyncImpl(String name, NMClient client, NMClientAsync.CallbackHandler callbackHandler) {
        super(name, client, callbackHandler);
        this.client = client;
        this.callbackHandler = callbackHandler;
    }

    @Override
    protected void serviceInit(Configuration conf) throws Exception {
        this.maxThreadPoolSize = conf.getInt("yarn.client.nodemanager-client-async.thread-pool-max-size", 500);
        LOG.info("Upper bound of the thread pool size is " + this.maxThreadPoolSize);
        this.client.init(conf);
        super.serviceInit(conf);
    }

    @Override
    protected void serviceStart() throws Exception {
        this.client.start();
        ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(this.getClass().getName() + " #%d").setDaemon(true).build();
        int initSize = Math.min(10, this.maxThreadPoolSize);
        this.threadPool = new ThreadPoolExecutor(initSize, Integer.MAX_VALUE, 1L, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
        this.eventDispatcherThread = new Thread(){

            @Override
            public void run() {
                ContainerEvent event = null;
                HashSet<String> allNodes = new HashSet<String>();
                while (!NMClientAsyncImpl.this.stopped.get() && !Thread.currentThread().isInterrupted()) {
                    int nodeNum;
                    int idealThreadPoolSize;
                    try {
                        event = NMClientAsyncImpl.this.events.take();
                    }
                    catch (InterruptedException e) {
                        if (!NMClientAsyncImpl.this.stopped.get()) {
                            LOG.error("Returning, thread interrupted", e);
                        }
                        return;
                    }
                    allNodes.add(event.getNodeId().toString());
                    int threadPoolSize = NMClientAsyncImpl.this.threadPool.getCorePoolSize();
                    if (threadPoolSize != NMClientAsyncImpl.this.maxThreadPoolSize && threadPoolSize < (idealThreadPoolSize = Math.min(NMClientAsyncImpl.this.maxThreadPoolSize, nodeNum = allNodes.size()))) {
                        int newThreadPoolSize = Math.min(NMClientAsyncImpl.this.maxThreadPoolSize, idealThreadPoolSize + 10);
                        LOG.info("Set NMClientAsync thread pool size to " + newThreadPoolSize + " as the number of nodes to talk to is " + nodeNum);
                        NMClientAsyncImpl.this.threadPool.setCorePoolSize(newThreadPoolSize);
                    }
                    NMClientAsyncImpl.this.threadPool.execute(NMClientAsyncImpl.this.getContainerEventProcessor(event));
                }
            }
        };
        this.eventDispatcherThread.setName("Container  Event Dispatcher");
        this.eventDispatcherThread.setDaemon(false);
        this.eventDispatcherThread.start();
        super.serviceStart();
    }

    @Override
    protected void serviceStop() throws Exception {
        if (this.stopped.getAndSet(true)) {
            return;
        }
        if (this.eventDispatcherThread != null) {
            this.eventDispatcherThread.interrupt();
            try {
                this.eventDispatcherThread.join();
            }
            catch (InterruptedException e) {
                LOG.error("The thread of " + this.eventDispatcherThread.getName() + " didn't finish normally.", e);
            }
        }
        if (this.threadPool != null) {
            this.threadPool.shutdownNow();
        }
        if (this.client != null) {
            if ((!(this.client instanceof NMClientImpl) || ((NMClientImpl)this.client).getCleanupRunningContainers().get()) && this.containers != null) {
                this.containers.clear();
            }
            this.client.stop();
        }
        super.serviceStop();
    }

    @Override
    public void startContainerAsync(Container container, ContainerLaunchContext containerLaunchContext) {
        if (this.containers.putIfAbsent(container.getId(), new StatefulContainer(this, container.getId())) != null) {
            this.callbackHandler.onStartContainerError(container.getId(), RPCUtil.getRemoteException("Container " + container.getId() + " is already started or scheduled to start"));
        }
        try {
            this.events.put(new StartContainerEvent(container, containerLaunchContext));
        }
        catch (InterruptedException e) {
            LOG.warn("Exception when scheduling the event of starting Container " + container.getId());
            this.callbackHandler.onStartContainerError(container.getId(), e);
        }
    }

    @Override
    public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
        if (this.containers.get(containerId) == null) {
            this.callbackHandler.onStopContainerError(containerId, RPCUtil.getRemoteException("Container " + containerId + " is neither started nor scheduled to start"));
        }
        try {
            this.events.put(new ContainerEvent(containerId, nodeId, null, ContainerEventType.STOP_CONTAINER));
        }
        catch (InterruptedException e) {
            LOG.warn("Exception when scheduling the event of stopping Container " + containerId);
            this.callbackHandler.onStopContainerError(containerId, e);
        }
    }

    @Override
    public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId) {
        try {
            this.events.put(new ContainerEvent(containerId, nodeId, null, ContainerEventType.QUERY_CONTAINER));
        }
        catch (InterruptedException e) {
            LOG.warn("Exception when scheduling the event of querying the status of Container " + containerId);
            this.callbackHandler.onGetContainerStatusError(containerId, e);
        }
    }

    protected boolean isCompletelyDone(StatefulContainer container) {
        return container.getState() == ContainerState.DONE || container.getState() == ContainerState.FAILED;
    }

    protected ContainerEventProcessor getContainerEventProcessor(ContainerEvent event) {
        return new ContainerEventProcessor(event);
    }

    protected class ContainerEventProcessor
    implements Runnable {
        protected ContainerEvent event;

        public ContainerEventProcessor(ContainerEvent event) {
            this.event = event;
        }

        @Override
        public void run() {
            ContainerId containerId = this.event.getContainerId();
            LOG.info("Processing Event " + this.event + " for Container " + containerId);
            if (this.event.getType() == ContainerEventType.QUERY_CONTAINER) {
                try {
                    ContainerStatus containerStatus = NMClientAsyncImpl.this.client.getContainerStatus(containerId, this.event.getNodeId());
                    try {
                        NMClientAsyncImpl.this.callbackHandler.onContainerStatusReceived(containerId, containerStatus);
                    }
                    catch (Throwable thr) {
                        LOG.info("Unchecked exception is thrown from onContainerStatusReceived for Container " + this.event.getContainerId(), thr);
                    }
                }
                catch (YarnException e) {
                    this.onExceptionRaised(containerId, e);
                }
                catch (IOException e) {
                    this.onExceptionRaised(containerId, e);
                }
                catch (Throwable t) {
                    this.onExceptionRaised(containerId, t);
                }
            } else {
                StatefulContainer container = (StatefulContainer)NMClientAsyncImpl.this.containers.get(containerId);
                if (container == null) {
                    LOG.info("Container " + containerId + " is already stopped or failed");
                } else {
                    container.handle(this.event);
                    if (NMClientAsyncImpl.this.isCompletelyDone(container)) {
                        NMClientAsyncImpl.this.containers.remove(containerId);
                    }
                }
            }
        }

        private void onExceptionRaised(ContainerId containerId, Throwable t) {
            try {
                NMClientAsyncImpl.this.callbackHandler.onGetContainerStatusError(containerId, t);
            }
            catch (Throwable thr) {
                LOG.info("Unchecked exception is thrown from onGetContainerStatusError for Container " + containerId, thr);
            }
        }
    }

    protected static class StatefulContainer
    implements EventHandler<ContainerEvent> {
        protected static final StateMachineFactory<StatefulContainer, ContainerState, ContainerEventType, ContainerEvent> stateMachineFactory = new StateMachineFactory<StatefulContainer, ContainerState, ContainerEventType, ContainerEvent>(ContainerState.PREP).addTransition(ContainerState.PREP, EnumSet.of(ContainerState.RUNNING, ContainerState.FAILED), ContainerEventType.START_CONTAINER, new StartContainerTransition()).addTransition(ContainerState.PREP, ContainerState.DONE, ContainerEventType.STOP_CONTAINER, (SingleArcTransition<StatefulContainer, ContainerEvent>)new OutOfOrderTransition()).addTransition(ContainerState.RUNNING, EnumSet.of(ContainerState.DONE, ContainerState.FAILED), ContainerEventType.STOP_CONTAINER, new StopContainerTransition()).addTransition(ContainerState.DONE, ContainerState.DONE, EnumSet.of(ContainerEventType.START_CONTAINER, ContainerEventType.STOP_CONTAINER)).addTransition(ContainerState.FAILED, ContainerState.FAILED, EnumSet.of(ContainerEventType.START_CONTAINER, ContainerEventType.STOP_CONTAINER));
        private final NMClientAsync nmClientAsync;
        private final ContainerId containerId;
        private final StateMachine<ContainerState, ContainerEventType, ContainerEvent> stateMachine;
        private final ReentrantReadWriteLock.ReadLock readLock;
        private final ReentrantReadWriteLock.WriteLock writeLock;

        public StatefulContainer(NMClientAsync client, ContainerId containerId) {
            this.nmClientAsync = client;
            this.containerId = containerId;
            this.stateMachine = stateMachineFactory.make(this);
            ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
            this.readLock = lock.readLock();
            this.writeLock = lock.writeLock();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void handle(ContainerEvent event) {
            this.writeLock.lock();
            try {
                try {
                    this.stateMachine.doTransition((ContainerEventType)((Object)event.getType()), event);
                }
                catch (InvalidStateTransitonException e) {
                    LOG.error("Can't handle this event at current state", e);
                }
            }
            finally {
                this.writeLock.unlock();
            }
        }

        public ContainerId getContainerId() {
            return this.containerId;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public ContainerState getState() {
            this.readLock.lock();
            try {
                ContainerState containerState = this.stateMachine.getCurrentState();
                return containerState;
            }
            finally {
                this.readLock.unlock();
            }
        }

        protected static class OutOfOrderTransition
        implements SingleArcTransition<StatefulContainer, ContainerEvent> {
            protected static final String STOP_BEFORE_START_ERROR_MSG = "Container was killed before it was launched";

            protected OutOfOrderTransition() {
            }

            @Override
            public void transition(StatefulContainer container, ContainerEvent event) {
                try {
                    container.nmClientAsync.getCallbackHandler().onStartContainerError(event.getContainerId(), RPCUtil.getRemoteException(STOP_BEFORE_START_ERROR_MSG));
                }
                catch (Throwable thr) {
                    LOG.info("Unchecked exception is thrown from onStartContainerError for Container " + event.getContainerId(), thr);
                }
            }
        }

        protected static class StopContainerTransition
        implements MultipleArcTransition<StatefulContainer, ContainerEvent, ContainerState> {
            protected StopContainerTransition() {
            }

            @Override
            public ContainerState transition(StatefulContainer container, ContainerEvent event) {
                ContainerId containerId = event.getContainerId();
                try {
                    container.nmClientAsync.getClient().stopContainer(containerId, event.getNodeId());
                    try {
                        container.nmClientAsync.getCallbackHandler().onContainerStopped(event.getContainerId());
                    }
                    catch (Throwable thr) {
                        LOG.info("Unchecked exception is thrown from onContainerStopped for Container " + event.getContainerId(), thr);
                    }
                    return ContainerState.DONE;
                }
                catch (YarnException e) {
                    return this.onExceptionRaised(container, event, e);
                }
                catch (IOException e) {
                    return this.onExceptionRaised(container, event, e);
                }
                catch (Throwable t) {
                    return this.onExceptionRaised(container, event, t);
                }
            }

            private ContainerState onExceptionRaised(StatefulContainer container, ContainerEvent event, Throwable t) {
                try {
                    container.nmClientAsync.getCallbackHandler().onStopContainerError(event.getContainerId(), t);
                }
                catch (Throwable thr) {
                    LOG.info("Unchecked exception is thrown from onStopContainerError for Container " + event.getContainerId(), thr);
                }
                return ContainerState.FAILED;
            }
        }

        protected static class StartContainerTransition
        implements MultipleArcTransition<StatefulContainer, ContainerEvent, ContainerState> {
            protected StartContainerTransition() {
            }

            @Override
            public ContainerState transition(StatefulContainer container, ContainerEvent event) {
                ContainerId containerId = event.getContainerId();
                try {
                    StartContainerEvent scEvent = null;
                    if (event instanceof StartContainerEvent) {
                        scEvent = (StartContainerEvent)event;
                    }
                    assert (scEvent != null);
                    Map<String, ByteBuffer> allServiceResponse = container.nmClientAsync.getClient().startContainer(scEvent.getContainer(), scEvent.getContainerLaunchContext());
                    try {
                        container.nmClientAsync.getCallbackHandler().onContainerStarted(containerId, allServiceResponse);
                    }
                    catch (Throwable thr) {
                        LOG.info("Unchecked exception is thrown from onContainerStarted for Container " + containerId, thr);
                    }
                    return ContainerState.RUNNING;
                }
                catch (YarnException e) {
                    return this.onExceptionRaised(container, event, e);
                }
                catch (IOException e) {
                    return this.onExceptionRaised(container, event, e);
                }
                catch (Throwable t) {
                    return this.onExceptionRaised(container, event, t);
                }
            }

            private ContainerState onExceptionRaised(StatefulContainer container, ContainerEvent event, Throwable t) {
                try {
                    container.nmClientAsync.getCallbackHandler().onStartContainerError(event.getContainerId(), t);
                }
                catch (Throwable thr) {
                    LOG.info("Unchecked exception is thrown from onStartContainerError for Container " + event.getContainerId(), thr);
                }
                return ContainerState.FAILED;
            }
        }
    }

    protected static class StartContainerEvent
    extends ContainerEvent {
        private Container container;
        private ContainerLaunchContext containerLaunchContext;

        public StartContainerEvent(Container container, ContainerLaunchContext containerLaunchContext) {
            super(container.getId(), container.getNodeId(), container.getContainerToken(), ContainerEventType.START_CONTAINER);
            this.container = container;
            this.containerLaunchContext = containerLaunchContext;
        }

        public Container getContainer() {
            return this.container;
        }

        public ContainerLaunchContext getContainerLaunchContext() {
            return this.containerLaunchContext;
        }
    }

    protected static class ContainerEvent
    extends AbstractEvent<ContainerEventType> {
        private ContainerId containerId;
        private NodeId nodeId;
        private Token containerToken;

        public ContainerEvent(ContainerId containerId, NodeId nodeId, Token containerToken, ContainerEventType type) {
            super(type);
            this.containerId = containerId;
            this.nodeId = nodeId;
            this.containerToken = containerToken;
        }

        public ContainerId getContainerId() {
            return this.containerId;
        }

        public NodeId getNodeId() {
            return this.nodeId;
        }

        public Token getContainerToken() {
            return this.containerToken;
        }
    }

    protected static enum ContainerEventType {
        START_CONTAINER,
        STOP_CONTAINER,
        QUERY_CONTAINER;

    }

    protected static enum ContainerState {
        PREP,
        FAILED,
        RUNNING,
        DONE;

    }
}

