/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.taskexecutor.ShuffleDescriptorsCache;
import org.apache.flink.shaded.guava31.com.google.common.base.Ticker;
import org.apache.flink.shaded.guava31.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification;
import org.apache.flink.util.Preconditions;

public class DefaultShuffleDescriptorsCache
implements ShuffleDescriptorsCache {
    private final Cache<PermanentBlobKey, ShuffleDescriptorCacheEntry> shuffleDescriptorsCache;
    private final Map<JobID, Set<PermanentBlobKey>> cachedBlobKeysPerJob = new HashMap<JobID, Set<PermanentBlobKey>>();

    private DefaultShuffleDescriptorsCache(Duration expireTimeout, int cacheSizeLimit, Ticker ticker) {
        this.shuffleDescriptorsCache = CacheBuilder.newBuilder().concurrencyLevel(1).maximumSize(cacheSizeLimit).expireAfterAccess(expireTimeout).ticker(ticker).removalListener(this::onCacheRemoval).build();
    }

    @Override
    public void clear() {
        this.cachedBlobKeysPerJob.clear();
        this.shuffleDescriptorsCache.cleanUp();
    }

    @Override
    public TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup get(PermanentBlobKey blobKey) {
        ShuffleDescriptorCacheEntry entry = this.shuffleDescriptorsCache.getIfPresent(blobKey);
        return entry == null ? null : entry.getShuffleDescriptorGroup();
    }

    @Override
    public void put(JobID jobId, PermanentBlobKey blobKey, TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup shuffleDescriptorGroup) {
        this.shuffleDescriptorsCache.put(blobKey, new ShuffleDescriptorCacheEntry(shuffleDescriptorGroup, jobId));
        this.cachedBlobKeysPerJob.computeIfAbsent(jobId, ignore -> new HashSet()).add(blobKey);
    }

    @Override
    public void clearCacheForJob(JobID jobId) {
        Set<PermanentBlobKey> removed = this.cachedBlobKeysPerJob.remove(jobId);
        if (removed != null) {
            this.shuffleDescriptorsCache.invalidateAll(removed);
        }
    }

    private void onCacheRemoval(RemovalNotification<PermanentBlobKey, ShuffleDescriptorCacheEntry> removalNotification) {
        PermanentBlobKey blobKey = (PermanentBlobKey)removalNotification.getKey();
        ShuffleDescriptorCacheEntry entry = (ShuffleDescriptorCacheEntry)removalNotification.getValue();
        if (blobKey != null && entry != null) {
            this.cachedBlobKeysPerJob.computeIfPresent(entry.getJobId(), (jobID, permanentBlobKeys) -> {
                permanentBlobKeys.remove(blobKey);
                if (permanentBlobKeys.isEmpty()) {
                    return null;
                }
                return permanentBlobKeys;
            });
        }
    }

    public static class Factory {
        private static final Duration DEFAULT_CACHE_EXPIRE_TIMEOUT = Duration.ofSeconds(300L);
        private static final int DEFAULT_CACHE_SIZE_LIMIT = 100;
        private static final Ticker DEFAULT_TICKER = Ticker.systemTicker();
        private final Duration cacheExpireTimeout;
        private final int cacheSizeLimit;
        private final Ticker ticker;

        public Factory() {
            this(DEFAULT_CACHE_EXPIRE_TIMEOUT, 100, DEFAULT_TICKER);
        }

        @VisibleForTesting
        public Factory(Duration cacheExpireTimeout, int cacheSizeLimit, Ticker ticker) {
            this.cacheExpireTimeout = cacheExpireTimeout;
            this.cacheSizeLimit = cacheSizeLimit;
            this.ticker = ticker;
        }

        public DefaultShuffleDescriptorsCache create() {
            return new DefaultShuffleDescriptorsCache(this.cacheExpireTimeout, this.cacheSizeLimit, this.ticker);
        }
    }

    private static class ShuffleDescriptorCacheEntry {
        private final TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup shuffleDescriptorGroup;
        private final JobID jobId;

        public ShuffleDescriptorCacheEntry(TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup shuffleDescriptorGroup, JobID jobId) {
            this.shuffleDescriptorGroup = Preconditions.checkNotNull(shuffleDescriptorGroup);
            this.jobId = Preconditions.checkNotNull(jobId);
        }

        public TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup getShuffleDescriptorGroup() {
            return this.shuffleDescriptorGroup;
        }

        public JobID getJobId() {
            return this.jobId;
        }
    }
}

