/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.AppLogAggregator;
import org.apache.hadoop.yarn.util.ConverterUtils;

public class AppLogAggregatorImpl
implements AppLogAggregator {
    private static final Log LOG = LogFactory.getLog(AppLogAggregatorImpl.class);
    private static final int THREAD_SLEEP_TIME = 1000;
    private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = "yarn.nodemanager.log-aggregation.num-log-files-per-app";
    private static final int DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
    private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED = "yarn.nodemanager.log-aggregation.debug-enabled";
    private static final boolean DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED = false;
    private static final long NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS = 3600L;
    private final LocalDirsHandlerService dirsHandler;
    private final Dispatcher dispatcher;
    private final ApplicationId appId;
    private final String applicationId;
    private boolean logAggregationDisabled = false;
    private final Configuration conf;
    private final DeletionService delService;
    private final UserGroupInformation userUgi;
    private final Path remoteNodeLogFileForApp;
    private final Path remoteNodeTmpLogFileForApp;
    private final ContainerLogsRetentionPolicy retentionPolicy;
    private final BlockingQueue<ContainerId> pendingContainers;
    private final AtomicBoolean appFinishing = new AtomicBoolean();
    private final AtomicBoolean appAggregationFinished = new AtomicBoolean();
    private final AtomicBoolean aborted = new AtomicBoolean();
    private final Map<ApplicationAccessType, String> appAcls;
    private final FileContext lfs;
    private final LogAggregationContext logAggregationContext;
    private final Context context;
    private final int retentionSize;
    private final long rollingMonitorInterval;
    private final NodeId nodeId;
    private final Map<ContainerId, ContainerLogAggregator> containerLogAggregators = new HashMap<ContainerId, ContainerLogAggregator>();

    public AppLogAggregatorImpl(Dispatcher dispatcher, DeletionService deletionService, Configuration conf, ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId, LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp, ContainerLogsRetentionPolicy retentionPolicy, Map<ApplicationAccessType, String> appAcls, LogAggregationContext logAggregationContext, Context context, FileContext lfs) {
        this.dispatcher = dispatcher;
        this.conf = conf;
        this.delService = deletionService;
        this.appId = appId;
        this.applicationId = ConverterUtils.toString((ApplicationId)appId);
        this.userUgi = userUgi;
        this.dirsHandler = dirsHandler;
        this.remoteNodeLogFileForApp = remoteNodeLogFileForApp;
        this.remoteNodeTmpLogFileForApp = this.getRemoteNodeTmpLogFileForApp();
        this.retentionPolicy = retentionPolicy;
        this.pendingContainers = new LinkedBlockingQueue<ContainerId>();
        this.appAcls = appAcls;
        this.lfs = lfs;
        this.logAggregationContext = logAggregationContext;
        this.context = context;
        this.nodeId = nodeId;
        int configuredRentionSize = conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP, 30);
        this.retentionSize = configuredRentionSize <= 0 ? 30 : configuredRentionSize;
        long configuredRollingMonitorInterval = conf.getLong("yarn.nodemanager.log-aggregation.roll-monitoring-interval-seconds", -1L);
        boolean debug_mode = conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED, false);
        if (configuredRollingMonitorInterval > 0L && configuredRollingMonitorInterval < 3600L) {
            if (debug_mode) {
                this.rollingMonitorInterval = configuredRollingMonitorInterval;
            } else {
                LOG.warn((Object)"rollingMonitorIntervall should be more than or equal to 3600 seconds. Using 3600 seconds instead.");
                this.rollingMonitorInterval = 3600L;
            }
        } else {
            if (configuredRollingMonitorInterval <= 0L) {
                LOG.warn((Object)("rollingMonitorInterval is set as " + configuredRollingMonitorInterval + ". " + "The log rolling mornitoring interval is disabled. " + "The logs will be aggregated after this application is finished."));
            } else {
                LOG.warn((Object)("rollingMonitorInterval is set as " + configuredRollingMonitorInterval + ". " + "The logs will be aggregated every " + configuredRollingMonitorInterval + " seconds"));
            }
            this.rollingMonitorInterval = configuredRollingMonitorInterval;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void uploadLogsForContainers() {
        Credentials systemCredentials;
        if (this.logAggregationDisabled) {
            return;
        }
        if (UserGroupInformation.isSecurityEnabled() && (systemCredentials = this.context.getSystemCredentialsForApps().get(this.appId)) != null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("Adding new framework-token for " + this.appId + " for log-aggregation: " + systemCredentials.getAllTokens() + "; userUgi=" + this.userUgi));
            }
            this.userUgi.addCredentials(systemCredentials);
        }
        HashSet<ContainerId> pendingContainerInThisCycle = new HashSet<ContainerId>();
        this.pendingContainers.drainTo(pendingContainerInThisCycle);
        HashSet finishedContainers = new HashSet(pendingContainerInThisCycle);
        if (this.context.getApplications().get(this.appId) != null) {
            for (ContainerId container : ((Application)this.context.getApplications().get(this.appId)).getContainers().keySet()) {
                if (!this.shouldUploadLogs(container, true)) continue;
                pendingContainerInThisCycle.add(container);
            }
        }
        AggregatedLogFormat.LogWriter writer = null;
        try {
            try {
                writer = new AggregatedLogFormat.LogWriter(this.conf, this.remoteNodeTmpLogFileForApp, this.userUgi);
                writer.writeApplicationACLs(this.appAcls);
                writer.writeApplicationOwner(this.userUgi.getShortUserName());
            }
            catch (IOException e1) {
                LOG.error((Object)("Cannot create writer for app " + this.applicationId + ". Skip log upload this time. "));
                if (writer != null) {
                    writer.close();
                }
                return;
            }
            boolean uploadedLogsInThisCycle = false;
            for (ContainerId container : pendingContainerInThisCycle) {
                ContainerLogAggregator aggregator = null;
                if (this.containerLogAggregators.containsKey(container)) {
                    aggregator = this.containerLogAggregators.get(container);
                } else {
                    aggregator = new ContainerLogAggregator(container);
                    this.containerLogAggregators.put(container, aggregator);
                }
                Set<Path> uploadedFilePathsInThisCycle = aggregator.doContainerLogAggregation(writer);
                if (uploadedFilePathsInThisCycle.size() > 0) {
                    uploadedLogsInThisCycle = true;
                }
                this.delService.delete(this.userUgi.getShortUserName(), null, uploadedFilePathsInThisCycle.toArray(new Path[uploadedFilePathsInThisCycle.size()]));
                if (!finishedContainers.contains(container)) continue;
                this.containerLogAggregators.remove(container);
            }
            if (uploadedLogsInThisCycle) {
                this.cleanOldLogs();
            }
            if (writer != null) {
                writer.close();
            }
            final Path renamedPath = this.rollingMonitorInterval <= 0L ? this.remoteNodeLogFileForApp : new Path(this.remoteNodeLogFileForApp.getParent(), this.remoteNodeLogFileForApp.getName() + "_" + System.currentTimeMillis());
            final boolean rename = uploadedLogsInThisCycle;
            try {
                this.userUgi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Object>(){

                    @Override
                    public Object run() throws Exception {
                        FileSystem remoteFS = FileSystem.get((Configuration)AppLogAggregatorImpl.this.conf);
                        if (remoteFS.exists(AppLogAggregatorImpl.this.remoteNodeTmpLogFileForApp)) {
                            if (rename) {
                                remoteFS.rename(AppLogAggregatorImpl.this.remoteNodeTmpLogFileForApp, renamedPath);
                            } else {
                                remoteFS.delete(AppLogAggregatorImpl.this.remoteNodeTmpLogFileForApp, false);
                            }
                        }
                        return null;
                    }
                });
            }
            catch (Exception e) {
                LOG.error((Object)("Failed to move temporary log file to final location: [" + this.remoteNodeTmpLogFileForApp + "] to [" + renamedPath + "]"), (Throwable)e);
            }
        }
        finally {
            if (writer != null) {
                writer.close();
            }
        }
    }

    private void cleanOldLogs() {
        try {
            final FileSystem remoteFS = this.remoteNodeLogFileForApp.getFileSystem(this.conf);
            Path appDir = this.remoteNodeLogFileForApp.getParent().makeQualified(remoteFS.getUri(), remoteFS.getWorkingDirectory());
            HashSet status = new HashSet(Arrays.asList(remoteFS.listStatus(appDir)));
            Iterable mask = Iterables.filter(status, (Predicate)new Predicate<FileStatus>(){

                public boolean apply(FileStatus next) {
                    return next.getPath().getName().contains(LogAggregationUtils.getNodeString((NodeId)AppLogAggregatorImpl.this.nodeId)) && !next.getPath().getName().endsWith(".tmp");
                }
            });
            status = Sets.newHashSet((Iterable)mask);
            if (status.size() >= this.retentionSize) {
                ArrayList statusList = new ArrayList(status);
                Collections.sort(statusList, new Comparator<FileStatus>(){

                    @Override
                    public int compare(FileStatus s1, FileStatus s2) {
                        return s1.getModificationTime() < s2.getModificationTime() ? -1 : (s1.getModificationTime() > s2.getModificationTime() ? 1 : 0);
                    }
                });
                for (int i = 0; i <= statusList.size() - this.retentionSize; ++i) {
                    final FileStatus remove = (FileStatus)statusList.get(i);
                    try {
                        this.userUgi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<Object>(){

                            @Override
                            public Object run() throws Exception {
                                remoteFS.delete(remove.getPath(), false);
                                return null;
                            }
                        });
                        continue;
                    }
                    catch (Exception e) {
                        LOG.error((Object)("Failed to delete " + remove.getPath()), (Throwable)e);
                    }
                }
            }
        }
        catch (Exception e) {
            LOG.error((Object)"Failed to clean old logs", (Throwable)e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        try {
            this.doAppLogAggregation();
        }
        finally {
            if (!this.appAggregationFinished.get()) {
                LOG.warn((Object)("Aggregation did not complete for application " + this.appId));
            }
            this.appAggregationFinished.set(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doAppLogAggregation() {
        while (!this.appFinishing.get() && !this.aborted.get()) {
            AppLogAggregatorImpl appLogAggregatorImpl = this;
            synchronized (appLogAggregatorImpl) {
                try {
                    if (this.rollingMonitorInterval > 0L) {
                        this.wait(this.rollingMonitorInterval * 1000L);
                        if (this.appFinishing.get() || this.aborted.get()) {
                            break;
                        }
                        this.uploadLogsForContainers();
                    } else {
                        this.wait(1000L);
                    }
                }
                catch (InterruptedException e) {
                    LOG.warn((Object)"PendingContainers queue is interrupted");
                    this.appFinishing.set(true);
                }
            }
        }
        if (this.aborted.get()) {
            return;
        }
        this.uploadLogsForContainers();
        ArrayList<Path> localAppLogDirs = new ArrayList<Path>();
        for (String rootLogDir : this.dirsHandler.getLogDirsForCleanup()) {
            Path logPath = new Path(rootLogDir, this.applicationId);
            try {
                this.lfs.getFileStatus(logPath);
                localAppLogDirs.add(logPath);
            }
            catch (UnsupportedFileSystemException ue) {
                LOG.warn((Object)("Log dir " + rootLogDir + "is an unsupported file system"), (Throwable)ue);
            }
            catch (IOException fe) {}
        }
        if (localAppLogDirs.size() > 0) {
            this.delService.delete(this.userUgi.getShortUserName(), null, localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
        }
        this.dispatcher.getEventHandler().handle((Event)new ApplicationEvent(this.appId, ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
        this.appAggregationFinished.set(true);
    }

    private Path getRemoteNodeTmpLogFileForApp() {
        return new Path(this.remoteNodeLogFileForApp.getParent(), this.remoteNodeLogFileForApp.getName() + ".tmp");
    }

    private boolean shouldUploadLogs(ContainerId containerId, boolean wasContainerSuccessful) {
        if (this.retentionPolicy.equals((Object)ContainerLogsRetentionPolicy.ALL_CONTAINERS)) {
            return true;
        }
        if (this.retentionPolicy.equals((Object)ContainerLogsRetentionPolicy.APPLICATION_MASTER_ONLY)) {
            return (containerId.getContainerId() & 0xFFFFFFFFFFL) == 1L;
        }
        if (this.retentionPolicy.equals((Object)ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY)) {
            if ((containerId.getContainerId() & 0xFFFFFFFFFFL) == 1L) {
                return true;
            }
            return !wasContainerSuccessful;
        }
        return false;
    }

    @Override
    public void startContainerLogAggregation(ContainerId containerId, boolean wasContainerSuccessful) {
        if (this.shouldUploadLogs(containerId, wasContainerSuccessful)) {
            LOG.info((Object)("Considering container " + containerId + " for log-aggregation"));
            this.pendingContainers.add(containerId);
        }
    }

    @Override
    public synchronized void finishLogAggregation() {
        LOG.info((Object)("Application just finished : " + this.applicationId));
        this.appFinishing.set(true);
        this.notifyAll();
    }

    @Override
    public synchronized void abortLogAggregation() {
        LOG.info((Object)("Aborting log aggregation for " + this.applicationId));
        this.aborted.set(true);
        this.notifyAll();
    }

    @InterfaceAudience.Private
    @VisibleForTesting
    public synchronized void doLogAggregationOutOfBand() {
        LOG.info((Object)"Do OutOfBand log aggregation");
        this.notifyAll();
    }

    @VisibleForTesting
    public UserGroupInformation getUgi() {
        return this.userUgi;
    }

    private class ContainerLogAggregator {
        private final ContainerId containerId;
        private Set<String> uploadedFileMeta = new HashSet<String>();

        public ContainerLogAggregator(ContainerId containerId) {
            this.containerId = containerId;
        }

        public Set<Path> doContainerLogAggregation(AggregatedLogFormat.LogWriter writer) {
            LOG.info((Object)("Uploading logs for container " + this.containerId + ". Current good log dirs are " + StringUtils.join((CharSequence)",", AppLogAggregatorImpl.this.dirsHandler.getLogDirsForRead())));
            AggregatedLogFormat.LogKey logKey = new AggregatedLogFormat.LogKey(this.containerId);
            final AggregatedLogFormat.LogValue logValue = new AggregatedLogFormat.LogValue(AppLogAggregatorImpl.this.dirsHandler.getLogDirsForRead(), this.containerId, AppLogAggregatorImpl.this.userUgi.getShortUserName(), AppLogAggregatorImpl.this.logAggregationContext, this.uploadedFileMeta);
            try {
                writer.append(logKey, logValue);
            }
            catch (Exception e) {
                LOG.error((Object)("Couldn't upload logs for " + this.containerId + ". Skipping this container."));
                return new HashSet<Path>();
            }
            this.uploadedFileMeta.addAll(logValue.getCurrentUpLoadedFileMeta());
            Iterable mask = Iterables.filter(this.uploadedFileMeta, (Predicate)new Predicate<String>(){

                public boolean apply(String next) {
                    return logValue.getAllExistingFilesMeta().contains(next);
                }
            });
            this.uploadedFileMeta = Sets.newHashSet((Iterable)mask);
            return logValue.getCurrentUpLoadedFilesPath();
        }
    }
}

