/*
 * Decompiled with CFR 0.152.
 */
package alluxio.logserver;

import alluxio.Process;
import alluxio.conf.PropertyKey;
import alluxio.conf.ServerConfiguration;
import alluxio.logserver.AlluxioLog4jSocketNode;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.GuardedBy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AlluxioLogServerProcess
implements Process {
    public static final String LOGSERVER_CLIENT_LOGGER_APPENDER_NAME = "LOGSERVER_CLIENT_LOGGER";
    private static final Logger LOG = LoggerFactory.getLogger(AlluxioLogServerProcess.class);
    private static final long THREAD_KEEP_ALIVE_TIME_MS = 60000L;
    private final String mBaseLogsDir;
    private final int mPort;
    private ServerSocket mServerSocket;
    private final int mMinNumberOfThreads;
    private final int mMaxNumberOfThreads;
    @GuardedBy(value="mClientSockets")
    private final Set<Socket> mClientSockets = new HashSet<Socket>();
    private ExecutorService mThreadPool;
    private volatile boolean mStopped;

    public AlluxioLogServerProcess(String baseLogsDir) {
        this.mPort = ServerConfiguration.getInt((PropertyKey)PropertyKey.LOGSERVER_PORT);
        this.mMinNumberOfThreads = ServerConfiguration.getInt((PropertyKey)PropertyKey.LOGSERVER_THREADS_MIN);
        this.mMaxNumberOfThreads = ServerConfiguration.getInt((PropertyKey)PropertyKey.LOGSERVER_THREADS_MAX);
        this.mBaseLogsDir = baseLogsDir;
        this.mStopped = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start() throws Exception {
        SynchronousQueue<Runnable> synchronousQueue = new SynchronousQueue<Runnable>();
        this.mThreadPool = new ThreadPoolExecutor(this.mMinNumberOfThreads, this.mMaxNumberOfThreads, 60000L, TimeUnit.MILLISECONDS, synchronousQueue);
        this.mStopped = false;
        try {
            this.mServerSocket = new ServerSocket(this.mPort);
        }
        catch (IOException e2) {
            throw new RuntimeException("Failed to bind to port {}.", e2);
        }
        while (!this.mStopped) {
            Socket client;
            try {
                client = this.mServerSocket.accept();
            }
            catch (IOException e3) {
                if (!this.mServerSocket.isClosed()) continue;
                break;
            }
            String clientAddress = client.getInetAddress().getHostAddress();
            LOG.info("Starting thread to read logs from {}", (Object)clientAddress);
            AlluxioLog4jSocketNode clientSocketNode = new AlluxioLog4jSocketNode(this.mBaseLogsDir, client);
            Set<Socket> set = this.mClientSockets;
            synchronized (set) {
                this.mClientSockets.add(client);
            }
            try {
                CompletableFuture.runAsync(clientSocketNode, this.mThreadPool).whenComplete((r, e) -> {
                    Set<Socket> set = this.mClientSockets;
                    synchronized (set) {
                        this.mClientSockets.remove(client);
                    }
                });
            }
            catch (RejectedExecutionException e4) {
                String errorMessage = String.format("Log server cannot find a worker thread to service log requests from %s. Increase the number of worker threads in the thread pool by configuring alluxio.logserver.threads.max in alluxio-site.properties. Current value is %d.", clientAddress, this.mMaxNumberOfThreads);
                throw new RuntimeException(errorMessage, e4);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() throws Exception {
        this.mStopped = true;
        if (this.mServerSocket != null) {
            try {
                this.mServerSocket.close();
            }
            catch (IOException e) {
                LOG.warn("Exception in closing server socket: {}", (Object)e.toString());
            }
        }
        this.mThreadPool.shutdownNow();
        Set<Socket> e = this.mClientSockets;
        synchronized (e) {
            for (Socket socket : this.mClientSockets) {
                try {
                    socket.close();
                }
                catch (IOException e2) {
                    LOG.warn("Exception in closing client socket: {}", (Object)e2.toString());
                }
            }
        }
        boolean ret = this.mThreadPool.awaitTermination(60000L, TimeUnit.MILLISECONDS);
        if (ret) {
            LOG.info("All worker threads have terminated.");
        } else {
            LOG.warn("Log server has timed out waiting for worker threads to terminate.");
        }
    }

    public boolean waitForReady(int timeoutMs) {
        try {
            CommonUtils.waitFor((String)(this + " to start"), () -> !this.mStopped, (WaitForOptions)WaitForOptions.defaults().setTimeoutMs(timeoutMs));
            return true;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
        catch (TimeoutException e) {
            return false;
        }
    }
}

