package alluxio.worker.block;

import alluxio.client.file.FileSystemContext;
import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.AlluxioException;
import alluxio.exception.status.CancelledException;
import alluxio.grpc.CacheRequest;
import alluxio.metrics.MetricKey;
import alluxio.metrics.MetricsSystem;
import alluxio.proto.dataserver.Protocol;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;
import alluxio.util.io.BufferUtils;
import alluxio.util.logging.SamplingLogger;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.worker.block.io.BlockReader;
import alluxio.worker.block.io.BlockWriter;
import com.codahale.metrics.Counter;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:alluxio/worker/block/CacheRequestManager.class */
public class CacheRequestManager {
    private final ExecutorService mCacheExecutor;
    private final DefaultBlockWorker mBlockWorker;
    private final FileSystemContext mFsContext;
    private static final Logger LOG = LoggerFactory.getLogger(CacheRequestManager.class);
    private static final Logger SAMPLING_LOG = new SamplingLogger(LOG, 600000);
    private static final int NETWORK_HOST_RESOLUTION_TIMEOUT = (int) Configuration.getMs(PropertyKey.NETWORK_HOST_RESOLUTION_TIMEOUT_MS);
    private static final Counter CACHE_REQUESTS = MetricsSystem.counter(MetricKey.WORKER_CACHE_REQUESTS.getName());
    private static final Counter CACHE_REQUESTS_ASYNC = MetricsSystem.counter(MetricKey.WORKER_CACHE_REQUESTS_ASYNC.getName());
    private static final Counter CACHE_REQUESTS_SYNC = MetricsSystem.counter(MetricKey.WORKER_CACHE_REQUESTS_SYNC.getName());
    private static final Counter CACHE_FAILED_BLOCKS = MetricsSystem.counter(MetricKey.WORKER_CACHE_FAILED_BLOCKS.getName());
    private static final Counter CACHE_REMOTE_BLOCKS = MetricsSystem.counter(MetricKey.WORKER_CACHE_REMOTE_BLOCKS.getName());
    private static final Counter CACHE_SUCCEEDED_BLOCKS = MetricsSystem.counter(MetricKey.WORKER_CACHE_SUCCEEDED_BLOCKS.getName());
    private static final Counter CACHE_UFS_BLOCKS = MetricsSystem.counter(MetricKey.WORKER_CACHE_UFS_BLOCKS.getName());
    private static final Counter CACHE_BLOCKS_SIZE = MetricsSystem.counter(MetricKey.WORKER_CACHE_BLOCKS_SIZE.getName());
    private final ConcurrentHashMap<Long, CacheRequest> mActiveCacheRequests = new ConcurrentHashMap<>();
    private final AtomicLong mNumRejected = new AtomicLong(0);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:alluxio/worker/block/CacheRequestManager$CacheResult.class */
    public enum CacheResult {
        SUCCEED,
        FAILED,
        ALREADY_CACHED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:alluxio/worker/block/CacheRequestManager$CacheTask.class */
    public class CacheTask implements Callable<Void> {
        private final CacheRequest mRequest;

        CacheTask(CacheRequest cacheRequest) {
            this.mRequest = cacheRequest;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.mRequest.getBlockId()));
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (!(obj instanceof CacheTask)) {
                return false;
            }
            CacheTask cacheTask = (CacheTask) obj;
            if (this.mRequest == cacheTask.mRequest) {
                return true;
            }
            return (this.mRequest == null || cacheTask.mRequest == null || this.mRequest.getBlockId() != cacheTask.mRequest.getBlockId()) ? false : true;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() throws IOException, AlluxioException {
            long blockId = this.mRequest.getBlockId();
            long length = this.mRequest.getLength();
            CacheResult cacheResult = CacheResult.FAILED;
            try {
                cacheResult = CacheRequestManager.this.cacheBlock(this.mRequest);
                switch (cacheResult) {
                    case SUCCEED:
                        CacheRequestManager.CACHE_BLOCKS_SIZE.inc(length);
                        CacheRequestManager.CACHE_SUCCEEDED_BLOCKS.inc();
                        break;
                    case FAILED:
                        CacheRequestManager.CACHE_FAILED_BLOCKS.inc();
                        break;
                }
                CacheRequestManager.this.mActiveCacheRequests.remove(Long.valueOf(blockId));
                return null;
            } catch (Throwable th) {
                switch (cacheResult) {
                    case SUCCEED:
                        CacheRequestManager.CACHE_BLOCKS_SIZE.inc(length);
                        CacheRequestManager.CACHE_SUCCEEDED_BLOCKS.inc();
                        break;
                    case FAILED:
                        CacheRequestManager.CACHE_FAILED_BLOCKS.inc();
                        break;
                }
                CacheRequestManager.this.mActiveCacheRequests.remove(Long.valueOf(blockId));
                throw th;
            }
        }
    }

    public CacheRequestManager(ExecutorService executorService, DefaultBlockWorker defaultBlockWorker, FileSystemContext fileSystemContext) {
        this.mCacheExecutor = executorService;
        this.mBlockWorker = defaultBlockWorker;
        this.mFsContext = fileSystemContext;
    }

    public void submitRequest(CacheRequest cacheRequest) throws AlluxioException, IOException {
        CACHE_REQUESTS.inc();
        long blockId = cacheRequest.getBlockId();
        boolean async = cacheRequest.getAsync();
        if (this.mActiveCacheRequests.putIfAbsent(Long.valueOf(blockId), cacheRequest) != null) {
            if (async) {
                LOG.debug("request already planned: {}", cacheRequest);
                return;
            }
            try {
                CommonUtils.waitFor("block to be loaded", () -> {
                    return Boolean.valueOf(!this.mActiveCacheRequests.containsKey(Long.valueOf(blockId)));
                }, WaitForOptions.defaults().setTimeoutMs(30000L));
                return;
            } catch (InterruptedException e) {
                throw new CancelledException("Fail to finish cache request synchronously. Interrupted while waiting for block to be loaded by another request.", e);
            } catch (TimeoutException e2) {
                throw new CancelledException("Fail to finish cache request synchronously due to timeout", e2);
            }
        }
        if (async) {
            CACHE_REQUESTS_ASYNC.inc();
        } else {
            CACHE_REQUESTS_SYNC.inc();
        }
        Future future = null;
        try {
            future = this.mCacheExecutor.submit(new CacheTask(cacheRequest));
        } catch (RejectedExecutionException e3) {
            this.mNumRejected.incrementAndGet();
            SAMPLING_LOG.warn(String.format("Failed to cache block locally as the thread pool is at capacity. To increase, update the parameter '%s'. numRejected: {} error: {}", "alluxio.worker.network.async.cache.manager.threads.max"), Long.valueOf(this.mNumRejected.get()), e3.toString());
            this.mActiveCacheRequests.remove(Long.valueOf(blockId));
            if (!async) {
                throw new CancelledException(String.format("Fail to finish cache request synchronously as the thread pool is at capacity. To increase the capacity, set the parameter '%s' and '%s' higher. ", "alluxio.worker.network.async.cache.manager.threads.max", "alluxio.worker.network.async.cache.manager.queue.max"), e3);
            }
        }
        if (future == null || async) {
            return;
        }
        try {
            future.get();
        } catch (InterruptedException e4) {
            throw new CancelledException("Fail to finish cache request synchronously. Interrupted while waiting for response.", e4);
        } catch (ExecutionException e5) {
            CACHE_FAILED_BLOCKS.inc();
            Throwable cause = e5.getCause();
            if (!(cause instanceof AlluxioException)) {
                throw new IOException(cause);
            }
            throw new AlluxioException(cause.getMessage(), cause);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public CacheResult cacheBlock(CacheRequest cacheRequest) throws IOException, AlluxioException {
        CacheResult cacheBlockFromRemoteWorker;
        boolean isLocalAddress = NetworkAddressUtils.isLocalAddress(cacheRequest.getSourceHost(), NETWORK_HOST_RESOLUTION_TIMEOUT);
        long blockId = cacheRequest.getBlockId();
        long length = cacheRequest.getLength();
        if (this.mBlockWorker.getBlockStore().hasBlockMeta(blockId)) {
            LOG.debug("block already cached: {}", Long.valueOf(blockId));
            return CacheResult.ALREADY_CACHED;
        }
        Protocol.OpenUfsBlockOptions openUfsBlockOptions = cacheRequest.getOpenUfsBlockOptions();
        if (isLocalAddress) {
            CACHE_UFS_BLOCKS.inc();
            cacheBlockFromRemoteWorker = cacheBlockFromUfs(blockId, length, openUfsBlockOptions);
        } else {
            CACHE_REMOTE_BLOCKS.inc();
            cacheBlockFromRemoteWorker = cacheBlockFromRemoteWorker(blockId, length, new InetSocketAddress(cacheRequest.getSourceHost(), cacheRequest.getSourcePort()), openUfsBlockOptions);
        }
        LOG.debug("Result of caching block {}: {}", Long.valueOf(blockId), cacheBlockFromRemoteWorker);
        return cacheBlockFromRemoteWorker;
    }

    private CacheResult cacheBlockFromUfs(long j, long j2, Protocol.OpenUfsBlockOptions openUfsBlockOptions) throws IOException {
        BlockReader createUfsBlockReader = this.mBlockWorker.createUfsBlockReader(-8L, j, 0L, false, openUfsBlockOptions);
        Throwable th = null;
        long j3 = 0;
        while (j3 < j2) {
            try {
                try {
                    long min = Math.min(8388608L, j2 - j3);
                    createUfsBlockReader.read(j3, min);
                    j3 += min;
                } finally {
                }
            } catch (Throwable th2) {
                if (createUfsBlockReader != null) {
                    if (th != null) {
                        try {
                            createUfsBlockReader.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        createUfsBlockReader.close();
                    }
                }
                throw th2;
            }
        }
        if (createUfsBlockReader != null) {
            if (0 != 0) {
                try {
                    createUfsBlockReader.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                createUfsBlockReader.close();
            }
        }
        return CacheResult.SUCCEED;
    }

    /* JADX WARN: Failed to calculate best type for var: r20v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r20v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Failed to calculate best type for var: r21v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Failed to calculate best type for var: r21v1 ??
    java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
    	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
    	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
     */
    /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
    	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
    	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
    	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
    	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
    	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
     */
    /* JADX WARN: Not initialized variable reg: 20, insn: 0x00fe: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r20 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:56:0x00fe */
    /* JADX WARN: Not initialized variable reg: 21, insn: 0x0103: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r21 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]), block:B:58:0x0103 */
    /* JADX WARN: Type inference failed for: r20v1, types: [alluxio.worker.block.io.BlockReader] */
    /* JADX WARN: Type inference failed for: r21v1, types: [java.lang.Throwable] */
    private CacheResult cacheBlockFromRemoteWorker(long j, long j2, InetSocketAddress inetSocketAddress, Protocol.OpenUfsBlockOptions openUfsBlockOptions) throws IOException {
        ?? r20;
        ?? r21;
        if (this.mBlockWorker.getBlockStore().hasBlockMeta(j) || this.mBlockWorker.getBlockStore().hasTempBlockMeta(j)) {
            return CacheResult.ALREADY_CACHED;
        }
        this.mBlockWorker.createBlock(-7L, j, 0, new CreateBlockOptions((String) null, "", j2));
        try {
            try {
                RemoteBlockReader remoteBlockReader = getRemoteBlockReader(j, j2, inetSocketAddress, openUfsBlockOptions);
                Throwable th = null;
                BlockWriter createBlockWriter = this.mBlockWorker.createBlockWriter(-7L, j);
                Throwable th2 = null;
                try {
                    try {
                        BufferUtils.transfer(remoteBlockReader.getChannel(), createBlockWriter.getChannel());
                        this.mBlockWorker.commitBlock(-7L, j, false);
                        CacheResult cacheResult = CacheResult.SUCCEED;
                        if (createBlockWriter != null) {
                            if (0 != 0) {
                                try {
                                    createBlockWriter.close();
                                } catch (Throwable th3) {
                                    th2.addSuppressed(th3);
                                }
                            } else {
                                createBlockWriter.close();
                            }
                        }
                        if (remoteBlockReader != null) {
                            if (0 != 0) {
                                try {
                                    remoteBlockReader.close();
                                } catch (Throwable th4) {
                                    th.addSuppressed(th4);
                                }
                            } else {
                                remoteBlockReader.close();
                            }
                        }
                        return cacheResult;
                    } finally {
                    }
                } catch (Throwable th5) {
                    if (createBlockWriter != null) {
                        if (th2 != null) {
                            try {
                                createBlockWriter.close();
                            } catch (Throwable th6) {
                                th2.addSuppressed(th6);
                            }
                        } else {
                            createBlockWriter.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                if (r20 != 0) {
                    if (r21 != 0) {
                        try {
                            r20.close();
                        } catch (Throwable th8) {
                            r21.addSuppressed(th8);
                        }
                    } else {
                        r20.close();
                    }
                }
                throw th7;
            }
        } catch (IOException | IllegalStateException e) {
            LOG.warn("Failed to async cache block {} from remote worker ({}) on copying the block: {}", new Object[]{Long.valueOf(j), inetSocketAddress, e.toString()});
            try {
                this.mBlockWorker.abortBlock(-7L, j);
            } catch (IOException e2) {
                LOG.warn("Failed to abort block {}: {}", Long.valueOf(j), e2.toString());
            }
            throw e;
        }
    }

    @VisibleForTesting
    public RemoteBlockReader getRemoteBlockReader(long j, long j2, InetSocketAddress inetSocketAddress, Protocol.OpenUfsBlockOptions openUfsBlockOptions) {
        return new RemoteBlockReader(this.mFsContext, j, j2, inetSocketAddress, openUfsBlockOptions);
    }
}
