/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.yarn.server.sharedcachemanager.store;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringInterner;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
import org.apache.hadoop.yarn.server.sharedcachemanager.store.SCMStore;
import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResource;
import org.apache.hadoop.yarn.server.sharedcachemanager.store.SharedCacheResourceReference;

@InterfaceAudience.Private
@InterfaceStability.Evolving
public class InMemorySCMStore
extends SCMStore {
    private static final Log LOG = LogFactory.getLog(InMemorySCMStore.class);
    private final Map<String, SharedCacheResource> cachedResources = new ConcurrentHashMap<String, SharedCacheResource>();
    private Collection<ApplicationId> initialApps = new ArrayList<ApplicationId>();
    private final Object initialAppsLock = new Object();
    private long startTime;
    private int stalenessMinutes;
    private ScheduledExecutorService scheduler;
    private int initialDelayMin;
    private int checkPeriodMin;

    public InMemorySCMStore() {
        super(InMemorySCMStore.class.getName());
    }

    @VisibleForTesting
    public InMemorySCMStore(AppChecker appChecker) {
        super(InMemorySCMStore.class.getName(), appChecker);
    }

    private String intern(String key) {
        return StringInterner.weakIntern((String)key);
    }

    @Override
    protected void serviceInit(Configuration conf) throws Exception {
        this.startTime = System.currentTimeMillis();
        this.initialDelayMin = InMemorySCMStore.getInitialDelay(conf);
        this.checkPeriodMin = InMemorySCMStore.getCheckPeriod(conf);
        this.stalenessMinutes = InMemorySCMStore.getStalenessPeriod(conf);
        this.bootstrap(conf);
        ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore").build();
        this.scheduler = Executors.newSingleThreadScheduledExecutor(tf);
        super.serviceInit(conf);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void serviceStart() throws Exception {
        super.serviceStart();
        LOG.info((Object)"Getting the active app list to initialize the in-memory scm store");
        Object object = this.initialAppsLock;
        synchronized (object) {
            this.initialApps = this.appChecker.getActiveApplications();
        }
        LOG.info((Object)(this.initialApps.size() + " apps recorded as active at this time"));
        AppCheckTask task = new AppCheckTask(this.appChecker);
        this.scheduler.scheduleAtFixedRate(task, this.initialDelayMin, this.checkPeriodMin, TimeUnit.MINUTES);
        LOG.info((Object)("Scheduled the in-memory scm store app check task to run every " + this.checkPeriodMin + " minutes."));
    }

    protected void serviceStop() throws Exception {
        LOG.info((Object)("Stopping the " + InMemorySCMStore.class.getSimpleName() + " service."));
        if (this.scheduler != null) {
            LOG.info((Object)"Shutting down the background thread.");
            this.scheduler.shutdownNow();
            try {
                if (!this.scheduler.awaitTermination(10L, TimeUnit.SECONDS)) {
                    LOG.warn((Object)"Gave up waiting for the app check task to shutdown.");
                }
            }
            catch (InterruptedException e) {
                LOG.warn((Object)"The InMemorySCMStore was interrupted while shutting down the app check task.", (Throwable)e);
            }
            LOG.info((Object)"The background thread stopped.");
        }
        super.serviceStop();
    }

    private void bootstrap(Configuration conf) throws IOException {
        Map<String, String> initialCachedResources = this.getInitialCachedResources(FileSystem.get((Configuration)conf), conf);
        LOG.info((Object)("Bootstrapping from " + initialCachedResources.size() + " cache resources located in the file system"));
        Iterator<Map.Entry<String, String>> it = initialCachedResources.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, String> e = it.next();
            String key = this.intern(e.getKey());
            String fileName = e.getValue();
            SharedCacheResource resource = new SharedCacheResource(fileName);
            this.cachedResources.put(key, resource);
            it.remove();
        }
        LOG.info((Object)"Bootstrapping complete");
    }

    @VisibleForTesting
    Map<String, String> getInitialCachedResources(FileSystem fs, Configuration conf) throws IOException {
        String location = conf.get("yarn.sharedcache.root-dir", "/sharedcache");
        Path root = new Path(location);
        if (!fs.exists(root)) {
            String message = "The shared cache root directory " + location + " was not found";
            LOG.error((Object)message);
            throw new IOException(message);
        }
        int nestedLevel = SharedCacheUtil.getCacheDepth((Configuration)conf);
        String pattern = SharedCacheUtil.getCacheEntryGlobPattern((int)(nestedLevel + 1));
        LOG.info((Object)"Querying for all individual cached resource files");
        FileStatus[] entries = fs.globStatus(new Path(root, pattern));
        int numEntries = entries == null ? 0 : entries.length;
        LOG.info((Object)("Found " + numEntries + " files: processing for one resource per " + "key"));
        HashMap<String, String> initialCachedEntries = new HashMap<String, String>();
        if (entries != null) {
            for (FileStatus entry : entries) {
                Path parent;
                Path file = entry.getPath();
                String fileName = file.getName();
                if (!entry.isFile() || (parent = file.getParent()) == null) continue;
                String key = parent.getName();
                if (initialCachedEntries.containsKey(key)) {
                    LOG.warn((Object)("Key " + key + " is already mapped to file " + (String)initialCachedEntries.get(key) + "; file " + fileName + " will not be added"));
                    continue;
                }
                initialCachedEntries.put(key, fileName);
            }
        }
        LOG.info((Object)("A total of " + initialCachedEntries.size() + " files are now mapped"));
        return initialCachedEntries;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String addResource(String key, String fileName) {
        String interned;
        String string = interned = this.intern(key);
        synchronized (string) {
            SharedCacheResource resource = this.cachedResources.get(interned);
            if (resource == null) {
                resource = new SharedCacheResource(fileName);
                this.cachedResources.put(interned, resource);
            }
            return resource.getFileName();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public String addResourceReference(String key, SharedCacheResourceReference ref) {
        String interned;
        String string = interned = this.intern(key);
        synchronized (string) {
            SharedCacheResource resource = this.cachedResources.get(interned);
            if (resource == null) {
                return null;
            }
            resource.addReference(ref);
            resource.updateAccessTime();
            return resource.getFileName();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Collection<SharedCacheResourceReference> getResourceReferences(String key) {
        String interned;
        String string = interned = this.intern(key);
        synchronized (string) {
            SharedCacheResource resource = this.cachedResources.get(interned);
            if (resource == null) {
                return Collections.emptySet();
            }
            HashSet<SharedCacheResourceReference> refs = new HashSet<SharedCacheResourceReference>(resource.getResourceReferences());
            return Collections.unmodifiableSet(refs);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean removeResourceReference(String key, SharedCacheResourceReference ref, boolean updateAccessTime) {
        String interned;
        String string = interned = this.intern(key);
        synchronized (string) {
            boolean removed = false;
            SharedCacheResource resource = this.cachedResources.get(interned);
            if (resource != null) {
                Set<SharedCacheResourceReference> resourceRefs = resource.getResourceReferences();
                removed = resourceRefs.remove(ref);
                if (updateAccessTime) {
                    resource.updateAccessTime();
                }
            }
            return removed;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void removeResourceReferences(String key, Collection<SharedCacheResourceReference> refs, boolean updateAccessTime) {
        String interned;
        String string = interned = this.intern(key);
        synchronized (string) {
            SharedCacheResource resource = this.cachedResources.get(interned);
            if (resource != null) {
                Set<SharedCacheResourceReference> resourceRefs = resource.getResourceReferences();
                resourceRefs.removeAll(refs);
                if (updateAccessTime) {
                    resource.updateAccessTime();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void cleanResourceReferences(String key) throws YarnException {
        String interned;
        String string = interned = this.intern(key);
        synchronized (string) {
            super.cleanResourceReferences(key);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean removeResource(String key) {
        String interned;
        String string = interned = this.intern(key);
        synchronized (string) {
            SharedCacheResource resource = this.cachedResources.get(interned);
            if (resource == null) {
                return true;
            }
            if (!resource.getResourceReferences().isEmpty()) {
                return false;
            }
            this.cachedResources.remove(interned);
            return true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    long getAccessTime(String key) {
        String interned;
        String string = interned = this.intern(key);
        synchronized (string) {
            SharedCacheResource resource = this.cachedResources.get(interned);
            return resource == null ? -1L : resource.getAccessTime();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isResourceEvictable(String key, FileStatus file) {
        Object object = this.initialAppsLock;
        synchronized (object) {
            if (this.initialApps.size() > 0) {
                return false;
            }
        }
        long staleTime = System.currentTimeMillis() - TimeUnit.MINUTES.toMillis(this.stalenessMinutes);
        long accessTime = this.getAccessTime(key);
        if (accessTime == -1L) {
            long modTime = file.getModificationTime();
            long lastUse = modTime < this.startTime ? this.startTime : modTime;
            return lastUse < staleTime;
        }
        return accessTime < staleTime;
    }

    private static int getStalenessPeriod(Configuration conf) {
        int stalenessMinutes = conf.getInt("yarn.sharedcache.store.in-memory.staleness-period-mins", 10080);
        if (stalenessMinutes <= 0) {
            throw new HadoopIllegalArgumentException("Non-positive staleness value: " + stalenessMinutes + ". The staleness value must be greater than zero.");
        }
        return stalenessMinutes;
    }

    private static int getInitialDelay(Configuration conf) {
        int initialMinutes = conf.getInt("yarn.sharedcache.store.in-memory.initial-delay-mins", 10);
        if (initialMinutes <= 0) {
            throw new HadoopIllegalArgumentException("Non-positive initial delay value: " + initialMinutes + ". The initial delay value must be greater than zero.");
        }
        return initialMinutes;
    }

    private static int getCheckPeriod(Configuration conf) {
        int checkMinutes = conf.getInt("yarn.sharedcache.store.in-memory.check-period-mins", 720);
        if (checkMinutes <= 0) {
            throw new HadoopIllegalArgumentException("Non-positive check period value: " + checkMinutes + ". The check period value must be greater than zero.");
        }
        return checkMinutes;
    }

    @InterfaceAudience.Private
    @InterfaceStability.Evolving
    class AppCheckTask
    implements Runnable {
        private final AppChecker taskAppChecker;

        public AppCheckTask(AppChecker appChecker) {
            this.taskAppChecker = appChecker;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                LOG.info((Object)"Checking the initial app list for finished applications.");
                Object object = InMemorySCMStore.this.initialAppsLock;
                synchronized (object) {
                    if (!InMemorySCMStore.this.initialApps.isEmpty()) {
                        LOG.info((Object)("Looking into " + InMemorySCMStore.this.initialApps.size() + " apps to see if they are still active"));
                        Iterator it = InMemorySCMStore.this.initialApps.iterator();
                        while (it.hasNext()) {
                            ApplicationId id = (ApplicationId)it.next();
                            try {
                                if (this.taskAppChecker.isApplicationActive(id)) continue;
                                it.remove();
                            }
                            catch (YarnException e) {
                                LOG.warn((Object)"Exception while checking the app status; will leave the entry in the list", (Throwable)e);
                            }
                        }
                    }
                    LOG.info((Object)("There are now " + InMemorySCMStore.this.initialApps.size() + " entries in the list"));
                }
            }
            catch (Throwable e) {
                LOG.error((Object)"Unexpected exception thrown during in-memory store app check task. Rescheduling task.", e);
            }
        }
    }
}

