package alluxio.worker.block;

import alluxio.conf.Configuration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.CancelledException;
import alluxio.exception.status.DeadlineExceededException;
import alluxio.exception.status.InternalException;
import alluxio.grpc.BlockMasterWorkerServiceGrpc;
import alluxio.grpc.BuildVersion;
import alluxio.grpc.ConfigProperty;
import alluxio.grpc.RegisterWorkerPOptions;
import alluxio.grpc.RegisterWorkerPRequest;
import alluxio.grpc.RegisterWorkerPResponse;
import alluxio.grpc.StorageList;
import com.google.common.annotations.VisibleForTesting;
import io.grpc.stub.StreamObserver;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/worker/block/RegisterStreamer.class */
public class RegisterStreamer implements Iterator<RegisterWorkerPRequest> {
    private static final Logger LOG = LoggerFactory.getLogger(RegisterStreamer.class);
    private static final int MAX_BATCHES_IN_FLIGHT = 2;
    private final long mWorkerId;
    private final List<String> mStorageTierAliases;
    private final Map<String, Long> mTotalBytesOnTiers;
    private final Map<String, Long> mUsedBytesOnTiers;
    private final RegisterWorkerPOptions mOptions;
    private final Map<String, StorageList> mLostStorageMap;
    private int mBatchNumber;
    private final BlockMapIterator mBlockMapIterator;
    private final CountDownLatch mAckLatch;
    private final CountDownLatch mFinishLatch;
    private final Semaphore mBucket;
    private final AtomicReference<Throwable> mError;
    private final int mResponseTimeoutMs;
    private final int mDeadlineMs;
    private final int mCompleteTimeoutMs;
    private final BlockMasterWorkerServiceGrpc.BlockMasterWorkerServiceStub mAsyncClient;
    private final StreamObserver<RegisterWorkerPResponse> mMasterResponseObserver;
    private StreamObserver<RegisterWorkerPRequest> mWorkerRequestObserver;

    @VisibleForTesting
    public RegisterStreamer(BlockMasterWorkerServiceGrpc.BlockMasterWorkerServiceStub blockMasterWorkerServiceStub, long j, List<String> list, Map<String, Long> map, Map<String, Long> map2, Map<BlockStoreLocation, List<Long>> map3, Map<String, List<String>> map4, List<ConfigProperty> list2) {
        this(blockMasterWorkerServiceStub, j, list, map, map2, map4, list2, new BlockMapIterator(map3));
    }

    public RegisterStreamer(BlockMasterWorkerServiceGrpc.BlockMasterWorkerServiceStub blockMasterWorkerServiceStub, long j, List<String> list, Map<String, Long> map, Map<String, Long> map2, Map<String, List<String>> map3, List<ConfigProperty> list2, BlockMapIterator blockMapIterator) {
        this.mBucket = new Semaphore(MAX_BATCHES_IN_FLIGHT);
        this.mError = new AtomicReference<>();
        this.mAsyncClient = blockMasterWorkerServiceStub;
        this.mWorkerId = j;
        this.mStorageTierAliases = list;
        this.mTotalBytesOnTiers = map;
        this.mUsedBytesOnTiers = map2;
        this.mOptions = RegisterWorkerPOptions.newBuilder().addAllConfigs(list2).setBuildVersion(BuildVersion.newBuilder().setVersion("2.9.3").setRevision("9eae6eff2c3f553ed4e68373958288291edf97e3").build()).build();
        this.mLostStorageMap = (Map) map3.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return StorageList.newBuilder().addAllStorage((Iterable) entry.getValue()).build();
        }));
        this.mBatchNumber = 0;
        this.mBlockMapIterator = blockMapIterator;
        this.mAckLatch = new CountDownLatch(this.mBlockMapIterator.getBatchCount());
        this.mFinishLatch = new CountDownLatch(1);
        this.mResponseTimeoutMs = (int) Configuration.getMs(PropertyKey.WORKER_REGISTER_STREAM_RESPONSE_TIMEOUT);
        this.mDeadlineMs = (int) Configuration.getMs(PropertyKey.WORKER_REGISTER_STREAM_DEADLINE);
        this.mCompleteTimeoutMs = (int) Configuration.getMs(PropertyKey.WORKER_REGISTER_STREAM_COMPLETE_TIMEOUT);
        this.mMasterResponseObserver = new StreamObserver<RegisterWorkerPResponse>() { // from class: alluxio.worker.block.RegisterStreamer.1
            public void onNext(RegisterWorkerPResponse registerWorkerPResponse) {
                RegisterStreamer.LOG.debug("Worker {} - Received ACK {}", Long.valueOf(RegisterStreamer.this.mWorkerId), registerWorkerPResponse);
                RegisterStreamer.this.mBucket.release();
                RegisterStreamer.this.mAckLatch.countDown();
            }

            public void onError(Throwable th) {
                RegisterStreamer.LOG.error("Worker {} - received error from server, marking the stream as closed: ", Long.valueOf(RegisterStreamer.this.mWorkerId), th);
                RegisterStreamer.this.mError.set(th);
                RegisterStreamer.this.mFinishLatch.countDown();
            }

            public void onCompleted() {
                RegisterStreamer.LOG.info("{} - Complete message received from the server. Closing stream", Long.valueOf(RegisterStreamer.this.mWorkerId));
                RegisterStreamer.this.mFinishLatch.countDown();
            }
        };
    }

    public void registerWithMaster() throws CancelledException, InternalException, DeadlineExceededException, InterruptedException {
        this.mWorkerRequestObserver = this.mAsyncClient.withDeadlineAfter(this.mDeadlineMs, TimeUnit.MILLISECONDS).registerWorkerStream(this.mMasterResponseObserver);
        try {
            registerInternal();
        } catch (DeadlineExceededException | InterruptedException | CancelledException e) {
            LOG.error("Worker {} - Error during the register stream, aborting now.", Long.valueOf(this.mWorkerId), e);
            this.mWorkerRequestObserver.onError(e);
            throw e;
        }
    }

    private void registerInternal() throws InterruptedException, DeadlineExceededException, CancelledException, InternalException {
        int i = 0;
        while (hasNext()) {
            LOG.debug("Worker {} - Acquiring one token to send the next batch", Long.valueOf(this.mWorkerId));
            Instant now = Instant.now();
            if (!this.mBucket.tryAcquire(this.mResponseTimeoutMs, TimeUnit.MILLISECONDS)) {
                throw new DeadlineExceededException(String.format("No response from master for more than %dms during the stream!", Integer.valueOf(this.mResponseTimeoutMs)));
            }
            LOG.debug("Worker {} - master ACK received in {}ms, sending the next batch {}", new Object[]{Long.valueOf(this.mWorkerId), Long.valueOf(Duration.between(now, Instant.now()).toMillis()), Integer.valueOf(i)});
            this.mWorkerRequestObserver.onNext(next());
            if (this.mFinishLatch.getCount() == 0) {
                abort();
            }
            i++;
        }
        if (!this.mAckLatch.await(this.mResponseTimeoutMs * MAX_BATCHES_IN_FLIGHT, TimeUnit.MILLISECONDS)) {
            if (this.mFinishLatch.getCount() == 0) {
                abort();
            }
            throw new DeadlineExceededException(String.format("All batches have been sent to the master but only received %d ACKs!", Long.valueOf(this.mBlockMapIterator.getBatchCount() - this.mAckLatch.getCount())));
        }
        LOG.info("Worker {} - All requests have been sent. Completing the client side.", Long.valueOf(this.mWorkerId));
        this.mWorkerRequestObserver.onCompleted();
        LOG.info("Worker {} - Waiting on the master side to complete", Long.valueOf(this.mWorkerId));
        if (!this.mFinishLatch.await(this.mCompleteTimeoutMs, TimeUnit.MILLISECONDS)) {
            throw new DeadlineExceededException(String.format("All batches have been received by the master but the master failed to complete the registration in %dms!", Integer.valueOf(this.mCompleteTimeoutMs)));
        }
        if (this.mError.get() == null) {
            LOG.info("Worker {} - Finished registration with a stream", Long.valueOf(this.mWorkerId));
        } else {
            Throwable th = this.mError.get();
            LOG.error("Worker {} - Received an error from the master on completion", Long.valueOf(this.mWorkerId), th);
            throw new InternalException(th);
        }
    }

    private void abort() throws InternalException, CancelledException {
        if (this.mError.get() != null) {
            Throwable th = this.mError.get();
            LOG.error("Worker {} - Received an error from the master", Long.valueOf(this.mWorkerId), th);
            throw new InternalException(th);
        }
        String format = String.format("Worker %s - The server side has been closed before all the batches are sent from the worker!", Long.valueOf(this.mWorkerId));
        LOG.error(format);
        throw new CancelledException(format);
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        return this.mBlockMapIterator.hasNext() || this.mBatchNumber == 0;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.Iterator
    public RegisterWorkerPRequest next() {
        RegisterWorkerPRequest build;
        if (this.mBatchNumber == 0) {
            build = RegisterWorkerPRequest.newBuilder().setWorkerId(this.mWorkerId).addAllStorageTiers(this.mStorageTierAliases).putAllTotalBytesOnTiers(this.mTotalBytesOnTiers).putAllUsedBytesOnTiers(this.mUsedBytesOnTiers).putAllLostStorage(this.mLostStorageMap).setOptions(this.mOptions).addAllCurrentBlocks(this.mBlockMapIterator.hasNext() ? this.mBlockMapIterator.next() : Collections.emptyList()).build();
        } else {
            build = RegisterWorkerPRequest.newBuilder().setWorkerId(this.mWorkerId).addAllCurrentBlocks(this.mBlockMapIterator.next()).build();
        }
        this.mBatchNumber++;
        return build;
    }
}
