package alluxio.worker.block;

import alluxio.AlluxioURI;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.runtime.AlluxioRuntimeException;
import alluxio.exception.runtime.OutOfRangeRuntimeException;
import alluxio.exception.runtime.ResourceExhaustedRuntimeException;
import alluxio.grpc.UfsReadOptions;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.resource.CloseableResource;
import alluxio.security.authentication.AuthenticatedClientUser;
import alluxio.underfs.UfsManager;
import alluxio.underfs.UnderFileSystem;
import alluxio.underfs.options.OpenOptions;
import alluxio.util.IdUtils;
import alluxio.util.ThreadFactoryUtils;
import com.codahale.metrics.Meter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.Closeable;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;

/* loaded from: input_file:alluxio/worker/block/UfsIOManager.class */
public class UfsIOManager implements Closeable {
    private static final int READ_CAPACITY = 1024;
    private final UfsManager.UfsClient mUfsClient;
    private final ConcurrentMap<String, Long> mThroughputQuota = new ConcurrentHashMap();
    private final UfsInputStreamCache mUfsInstreamCache = new UfsInputStreamCache();
    private final LinkedBlockingQueue<ReadTask> mReadQueue = new LinkedBlockingQueue<>(READ_CAPACITY);
    private final ConcurrentMap<AlluxioURI, Meter> mUfsBytesReadThroughputMetrics = new ConcurrentHashMap();
    private final ExecutorService mUfsIoExecutor = Executors.newFixedThreadPool(Configuration.getInt(PropertyKey.UNDERFS_IO_THREADS), ThreadFactoryUtils.build("UfsIOManager-IO-%d", false));
    private final ExecutorService mScheduleExecutor = Executors.newSingleThreadExecutor(ThreadFactoryUtils.build("UfsIOManager-Scheduler-%d", true));

    /* loaded from: input_file:alluxio/worker/block/UfsIOManager$ReadTask.class */
    private class ReadTask implements Runnable {
        private final long mOffset;
        private final long mLength;
        private final CompletableFuture<Integer> mFuture;
        private final String mUfsPath;
        private final UfsReadOptions mOptions;
        private final Meter mMeter;
        private final long mFileId;
        private final ByteBuffer mBuffuer;

        private ReadTask(ByteBuffer byteBuffer, String str, long j, long j2, long j3, UfsReadOptions ufsReadOptions, CompletableFuture<Integer> completableFuture, Meter meter) {
            this.mOptions = ufsReadOptions;
            this.mUfsPath = str;
            this.mFileId = j;
            this.mOffset = j2;
            this.mLength = j3;
            this.mFuture = completableFuture;
            this.mMeter = meter;
            this.mBuffuer = byteBuffer;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.mFuture.complete(Integer.valueOf(readInternal()));
            } catch (RuntimeException e) {
                this.mFuture.completeExceptionally(e);
            }
        }

        private int readInternal() {
            int read;
            int i = 0;
            try {
                try {
                    CloseableResource acquireUfsResource = UfsIOManager.this.mUfsClient.acquireUfsResource();
                    Throwable th = null;
                    try {
                        try {
                            if (this.mOptions.hasUser()) {
                                AuthenticatedClientUser.set(this.mOptions.getUser());
                            }
                            InputStream acquire = UfsIOManager.this.mUfsInstreamCache.acquire((UnderFileSystem) acquireUfsResource.get(), this.mUfsPath, this.mFileId, OpenOptions.defaults().setOffset(this.mOffset).setPositionShort(this.mOptions.getPositionShort()));
                            while (i < this.mLength && (read = Channels.newChannel(acquire).read(this.mBuffuer)) != -1) {
                                i += read;
                            }
                            if (acquireUfsResource != null) {
                                if (0 != 0) {
                                    try {
                                        acquireUfsResource.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    acquireUfsResource.close();
                                }
                            }
                            if (acquire != null) {
                                UfsIOManager.this.mUfsInstreamCache.release(acquire);
                            }
                            this.mMeter.mark(i);
                            return i;
                        } finally {
                        }
                    } catch (Throwable th3) {
                        if (acquireUfsResource != null) {
                            if (th != null) {
                                try {
                                    acquireUfsResource.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                acquireUfsResource.close();
                            }
                        }
                        throw th3;
                    }
                } catch (Throwable th5) {
                    if (0 != 0) {
                        UfsIOManager.this.mUfsInstreamCache.release(null);
                    }
                    throw th5;
                }
            } catch (Exception e) {
                throw AlluxioRuntimeException.from(e);
            }
        }
    }

    public UfsIOManager(UfsManager.UfsClient ufsClient) {
        this.mUfsClient = ufsClient;
    }

    public void start() {
        this.mScheduleExecutor.submit(this::schedule);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.mScheduleExecutor.shutdownNow();
        this.mUfsIoExecutor.shutdownNow();
    }

    public void setQuota(String str, long j) {
        Preconditions.checkArgument(j > 0, "throughput should be positive");
        this.mThroughputQuota.put(str, Long.valueOf(j));
    }

    private void schedule() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                ReadTask take = this.mReadQueue.take();
                if (!this.mThroughputQuota.containsKey(take.mOptions.getTag()) || this.mThroughputQuota.get(take.mOptions.getTag()).longValue() >= getUsedThroughput(take.mMeter)) {
                    try {
                        this.mUfsIoExecutor.submit(take);
                    } catch (RejectedExecutionException e) {
                        this.mReadQueue.put(take);
                    }
                } else {
                    this.mReadQueue.put(take);
                }
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
        }
    }

    @VisibleForTesting
    public double getUsedThroughput(Meter meter) {
        return meter.getOneMinuteRate();
    }

    public CompletableFuture<Integer> read(ByteBuffer byteBuffer, long j, long j2, long j3, String str, UfsReadOptions ufsReadOptions) {
        Objects.requireNonNull(byteBuffer);
        if (j < 0 || j2 < 0 || j2 > byteBuffer.remaining()) {
            throw new OutOfRangeRuntimeException(String.format("offset is negative, len is negative, or len is greater than buf remaining. offset: %s, len: %s, buf remaining: %s", Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(byteBuffer.remaining())));
        }
        if (this.mReadQueue.size() >= READ_CAPACITY) {
            throw new ResourceExhaustedRuntimeException("UFS read at capacity", true);
        }
        CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
        if (j2 == 0) {
            completableFuture.complete(0);
            return completableFuture;
        }
        this.mReadQueue.add(new ReadTask(byteBuffer, str, IdUtils.fileIdFromBlockId(j3), j, j2, ufsReadOptions, completableFuture, this.mUfsBytesReadThroughputMetrics.computeIfAbsent(this.mUfsClient.getUfsMountPointUri(), alluxioURI -> {
            return MetricsSystem.meterWithTags(MetricKey.WORKER_BYTES_READ_UFS_THROUGHPUT.getName(), MetricKey.WORKER_BYTES_READ_UFS_THROUGHPUT.isClusterAggregated(), new String[]{"UFS", MetricsSystem.escape(this.mUfsClient.getUfsMountPointUri()), "User", ufsReadOptions.getTag()});
        })));
        return completableFuture;
    }
}
