package alluxio.worker.block;

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.underfs.SeekableUnderFileInputStream;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.OpenOptions;
import alluxio.util.IdUtils;
import alluxio.util.executor.ExecutorServiceFactories;
import alluxio.util.logging.SamplingLogger;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListeners;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:alluxio/worker/block/UfsInputStreamCache.class */
public final class UfsInputStreamCache {
    private static final Logger LOG = LoggerFactory.getLogger(UfsInputStreamCache.class);
    private static final Logger SAMPLING_LOG = new SamplingLogger(LOG, 600000);
    private static final boolean CACHE_ENABLED = Configuration.getBoolean(PropertyKey.WORKER_UFS_INSTREAM_CACHE_ENABLED);
    private final Map<Long, StreamIdSet> mFileIdToStreamIds = new ConcurrentHashMap();
    private final Cache<Long, CachedSeekableInputStream> mStreamCache = CacheBuilder.newBuilder().maximumSize(Configuration.getInt(PropertyKey.WORKER_UFS_INSTREAM_CACHE_MAX_SIZE)).expireAfterAccess(Configuration.getMs(PropertyKey.WORKER_UFS_INSTREAM_CACHE_EXPIRARTION_TIME), TimeUnit.MILLISECONDS).removalListener(RemovalListeners.asynchronous(removalNotification -> {
        CachedSeekableInputStream cachedSeekableInputStream = (CachedSeekableInputStream) removalNotification.getValue();
        long longValue = cachedSeekableInputStream.getFileId().longValue();
        long longValue2 = ((Long) removalNotification.getKey()).longValue();
        boolean z = false;
        StreamIdSet streamIdSet = this.mFileIdToStreamIds.get(Long.valueOf(longValue));
        if (streamIdSet == null) {
            LOG.warn("Removed UFS input stream (fileId: {} resourceId: {}) but does not exist", Long.valueOf(longValue), Long.valueOf(longValue2));
        } else {
            synchronized (streamIdSet) {
                if (streamIdSet.removeInUse(longValue2)) {
                    LOG.warn("Removed in-use UFS input stream (fileId: {} resourceId: {})", Long.valueOf(longValue), Long.valueOf(longValue2));
                }
                if (streamIdSet.removeAvailable(longValue2)) {
                    LOG.debug("Removed available UFS input stream (fileId: {} resourceId: {})", Long.valueOf(longValue), Long.valueOf(longValue2));
                    z = true;
                }
                if (streamIdSet.isEmpty()) {
                    this.mFileIdToStreamIds.remove(Long.valueOf(longValue));
                }
            }
        }
        if (z) {
            try {
                cachedSeekableInputStream.close();
            } catch (IOException e) {
                LOG.warn("Failed to close UFS input stream resource of file {} with file id {} and resource id {}", new Object[]{cachedSeekableInputStream.getFilePath(), cachedSeekableInputStream.getFileId(), Long.valueOf(longValue2)});
            }
        }
    }, ExecutorServiceFactories.fixedThreadPool("UfsInputStreamCacheExpiration", 2).create())).build();

    /* JADX INFO: Access modifiers changed from: private */
    @ThreadSafe
    /* loaded from: input_file:alluxio/worker/block/UfsInputStreamCache$StreamIdSet.class */
    public static class StreamIdSet {
        private final Set<Long> mInUseStreamIds = new HashSet();
        private final Set<Long> mAvailableStreamIds = new HashSet();

        StreamIdSet() {
        }

        synchronized Set<Long> availableIds() {
            return Collections.unmodifiableSet(this.mAvailableStreamIds);
        }

        synchronized void acquire(long j) {
            Preconditions.checkArgument(!this.mInUseStreamIds.contains(Long.valueOf(j)), "%s is already in use", j);
            this.mAvailableStreamIds.remove(Long.valueOf(j));
            this.mInUseStreamIds.add(Long.valueOf(j));
        }

        synchronized long acquireNewId() {
            while (true) {
                long randomNonNegativeLong = IdUtils.getRandomNonNegativeLong();
                if (!this.mAvailableStreamIds.contains(Long.valueOf(randomNonNegativeLong)) && !this.mInUseStreamIds.contains(Long.valueOf(randomNonNegativeLong))) {
                    acquire(randomNonNegativeLong);
                    return randomNonNegativeLong;
                }
            }
        }

        synchronized boolean isEmpty() {
            return this.mInUseStreamIds.isEmpty() && this.mAvailableStreamIds.isEmpty();
        }

        synchronized boolean removeInUse(long j) {
            return this.mInUseStreamIds.remove(Long.valueOf(j));
        }

        synchronized boolean removeAvailable(long j) {
            return this.mAvailableStreamIds.remove(Long.valueOf(j));
        }

        synchronized boolean release(long j) {
            Preconditions.checkArgument(!this.mAvailableStreamIds.contains(Long.valueOf(j)));
            if (!this.mInUseStreamIds.contains(Long.valueOf(j))) {
                return false;
            }
            this.mInUseStreamIds.remove(Long.valueOf(j));
            this.mAvailableStreamIds.add(Long.valueOf(j));
            return true;
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void release(InputStream inputStream) {
        if (!(inputStream instanceof CachedSeekableInputStream) || !CACHE_ENABLED) {
            close(inputStream);
            return;
        }
        CachedSeekableInputStream cachedSeekableInputStream = (CachedSeekableInputStream) inputStream;
        long longValue = cachedSeekableInputStream.getFileId().longValue();
        long resourceId = cachedSeekableInputStream.getResourceId();
        StreamIdSet streamIdSet = this.mFileIdToStreamIds.get(Long.valueOf(longValue));
        if (streamIdSet == null) {
            LOG.debug("UFS input stream (fileId: {} resourceId: {}) is already expired", Long.valueOf(longValue), Long.valueOf(resourceId));
            close(inputStream);
        } else {
            if (streamIdSet.release(resourceId)) {
                return;
            }
            LOG.debug("Close the expired UFS input stream (fileId: {} resourceId: {})", Long.valueOf(longValue), Long.valueOf(resourceId));
            close(inputStream);
        }
    }

    private void close(InputStream inputStream) {
        try {
            inputStream.close();
        } catch (IOException e) {
            throw AlluxioRuntimeException.from(e);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v36, types: [java.io.InputStream] */
    public InputStream acquire(UnderFileSystem underFileSystem, String str, long j, OpenOptions openOptions) throws IOException {
        if (!underFileSystem.isSeekable() || !CACHE_ENABLED) {
            return underFileSystem.openExistingFile(str, openOptions);
        }
        try {
            this.mStreamCache.cleanUp();
        } catch (Throwable th) {
            SAMPLING_LOG.warn("Explicit cache removal failed.", th);
        }
        StreamIdSet compute = this.mFileIdToStreamIds.compute(Long.valueOf(j), (l, streamIdSet) -> {
            return streamIdSet != null ? streamIdSet : new StreamIdSet();
        });
        CachedSeekableInputStream cachedSeekableInputStream = null;
        synchronized (compute) {
            Iterator<Long> it = compute.availableIds().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                long longValue = it.next().longValue();
                cachedSeekableInputStream = (CachedSeekableInputStream) this.mStreamCache.getIfPresent(Long.valueOf(longValue));
                if (cachedSeekableInputStream != null) {
                    compute.acquire(longValue);
                    break;
                }
            }
        }
        if (cachedSeekableInputStream != null) {
            LOG.debug("Reused the under file input stream resource of {}", Long.valueOf(cachedSeekableInputStream.getResourceId()));
            cachedSeekableInputStream.seek(openOptions.getOffset());
            return cachedSeekableInputStream;
        }
        long acquireNewId = compute.acquireNewId();
        try {
            return (CachedSeekableInputStream) this.mStreamCache.get(Long.valueOf(acquireNewId), () -> {
                SeekableUnderFileInputStream openExistingFile = underFileSystem.openExistingFile(str, OpenOptions.defaults().setPositionShort(openOptions.getPositionShort()).setOffset(openOptions.getOffset()));
                LOG.debug("Created the under file input stream resource of {}", Long.valueOf(acquireNewId));
                return new CachedSeekableInputStream(openExistingFile, acquireNewId, j, str);
            });
        } catch (ExecutionException e) {
            LOG.warn("Failed to create a new cached ufs instream of file id {} and path {}", new Object[]{Long.valueOf(j), str, e});
            return underFileSystem.openExistingFile(str, OpenOptions.defaults().setOffset(openOptions.getOffset()));
        } catch (UncheckedExecutionException e2) {
            throw AlluxioRuntimeException.from(e2.getCause());
        }
    }
}
