package alluxio.worker.block;

import alluxio.AbstractMasterClient;
import alluxio.conf.PropertyKey;
import alluxio.exception.FailedToAcquireRegisterLeaseException;
import alluxio.exception.status.AlluxioStatusException;
import alluxio.grpc.BlockHeartbeatPOptions;
import alluxio.grpc.BlockHeartbeatPRequest;
import alluxio.grpc.BlockIdList;
import alluxio.grpc.BlockMasterWorkerServiceGrpc;
import alluxio.grpc.BlockStoreLocationProto;
import alluxio.grpc.BuildVersion;
import alluxio.grpc.Command;
import alluxio.grpc.CommitBlockInUfsPRequest;
import alluxio.grpc.CommitBlockPRequest;
import alluxio.grpc.ConfigProperty;
import alluxio.grpc.GetRegisterLeasePRequest;
import alluxio.grpc.GetRegisterLeasePResponse;
import alluxio.grpc.GetWorkerIdPRequest;
import alluxio.grpc.GrpcUtils;
import alluxio.grpc.LocationBlockIdListEntry;
import alluxio.grpc.Metric;
import alluxio.grpc.NotifyWorkerIdPRequest;
import alluxio.grpc.RegisterWorkerPOptions;
import alluxio.grpc.RegisterWorkerPRequest;
import alluxio.grpc.ServiceType;
import alluxio.grpc.StorageList;
import alluxio.master.MasterClientContext;
import alluxio.master.selectionpolicy.MasterSelectionPolicy;
import alluxio.retry.RetryPolicy;
import alluxio.wire.WorkerNetAddress;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:alluxio/worker/block/BlockMasterClient.class */
public class BlockMasterClient extends AbstractMasterClient {
    private static final Logger LOG = LoggerFactory.getLogger(BlockMasterClient.class);
    public BlockMasterWorkerServiceGrpc.BlockMasterWorkerServiceBlockingStub mClient;
    public BlockMasterWorkerServiceGrpc.BlockMasterWorkerServiceStub mAsyncClient;

    public BlockMasterClient(MasterClientContext masterClientContext) {
        super(masterClientContext);
        this.mClient = null;
        this.mAsyncClient = null;
    }

    public BlockMasterClient(MasterClientContext masterClientContext, InetSocketAddress inetSocketAddress) {
        super(masterClientContext, MasterSelectionPolicy.Factory.specifiedMaster(inetSocketAddress));
        this.mClient = null;
        this.mAsyncClient = null;
    }

    protected ServiceType getRemoteServiceType() {
        return ServiceType.BLOCK_MASTER_WORKER_SERVICE;
    }

    protected String getServiceName() {
        return "BlockMasterWorker";
    }

    protected long getServiceVersion() {
        return 2L;
    }

    protected void afterConnect() {
        this.mClient = BlockMasterWorkerServiceGrpc.newBlockingStub(this.mChannel);
        this.mAsyncClient = BlockMasterWorkerServiceGrpc.newStub(this.mChannel);
    }

    public void commitBlock(long j, long j2, String str, String str2, long j3, long j4) throws AlluxioStatusException {
        retryRPC(() -> {
            this.mClient.commitBlock(CommitBlockPRequest.newBuilder().setWorkerId(j).setUsedBytesOnTier(j2).setTierAlias(str).setMediumType(str2).setBlockId(j3).setLength(j4).build());
            return null;
        }, LOG, "CommitBlock", "workerId=%d,usedBytesOnTier=%d,tierAlias=%s,mediumType=%s,blockId=%d,length=%d", new Object[]{Long.valueOf(j), Long.valueOf(j2), str, str2, Long.valueOf(j3), Long.valueOf(j4)});
    }

    public void commitBlockInUfs(long j, long j2) throws AlluxioStatusException {
        retryRPC(() -> {
            this.mClient.commitBlockInUfs(CommitBlockInUfsPRequest.newBuilder().setBlockId(j).setLength(j2).build());
            return null;
        }, LOG, "CommitBlockInUfs", "blockId=%d,length=%d", new Object[]{Long.valueOf(j), Long.valueOf(j2)});
    }

    public long getId(WorkerNetAddress workerNetAddress) throws IOException {
        return ((Long) retryRPC(() -> {
            return Long.valueOf(this.mClient.getWorkerId(GetWorkerIdPRequest.newBuilder().setWorkerNetAddress(GrpcUtils.toProto(workerNetAddress)).build()).getWorkerId());
        }, LOG, "GetId", "address=%s", new Object[]{workerNetAddress})).longValue();
    }

    @VisibleForTesting
    public List<LocationBlockIdListEntry> convertBlockListMapToProto(Map<BlockStoreLocation, List<Long>> map) {
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        for (Map.Entry<BlockStoreLocation, List<Long>> entry : map.entrySet()) {
            BlockStoreLocation key = entry.getKey();
            BlockStoreLocationProto build = BlockStoreLocationProto.newBuilder().setTierAlias(key.tierAlias()).setMediumType(key.mediumType()).build();
            if (hashMap.containsKey(build)) {
                ((List) hashMap.get(build)).addAll(entry.getValue());
            } else {
                hashMap.put(build, new ArrayList(entry.getValue()));
            }
        }
        for (Map.Entry entry2 : hashMap.entrySet()) {
            arrayList.add(LocationBlockIdListEntry.newBuilder().setKey((BlockStoreLocationProto) entry2.getKey()).setValue(BlockIdList.newBuilder().addAllBlockId((Iterable) entry2.getValue()).build()).build());
        }
        return arrayList;
    }

    public synchronized Command heartbeat(long j, Map<String, Long> map, Map<String, Long> map2, List<Long> list, Map<BlockStoreLocation, List<Long>> map3, Map<String, List<String>> map4, List<Metric> list2) throws IOException {
        BlockHeartbeatPRequest build = BlockHeartbeatPRequest.newBuilder().setWorkerId(j).putAllUsedBytesOnTiers(map2).addAllRemovedBlockIds(list).addAllAddedBlocks(convertBlockListMapToProto(map3)).setOptions(BlockHeartbeatPOptions.newBuilder().addAllMetrics(list2).putAllCapacityBytesOnTiers(map).build()).putAllLostStorage((Map) map4.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return StorageList.newBuilder().addAllStorage((Iterable) entry.getValue()).build();
        }))).build();
        return (Command) retryRPC(() -> {
            return this.mClient.withDeadlineAfter(this.mContext.getClusterConf().getMs(PropertyKey.WORKER_MASTER_PERIODICAL_RPC_TIMEOUT), TimeUnit.MILLISECONDS).blockHeartbeat(build).getCommand();
        }, LOG, "Heartbeat", "workerId=%d", new Object[]{Long.valueOf(j)});
    }

    private GetRegisterLeasePResponse acquireRegisterLease(long j, int i) throws IOException {
        return (GetRegisterLeasePResponse) retryRPC(() -> {
            LOG.info("Requesting lease with workerId {}, blockCount {}", Long.valueOf(j), Integer.valueOf(i));
            return this.mClient.requestRegisterLease(GetRegisterLeasePRequest.newBuilder().setWorkerId(j).setBlockCount(i).build());
        }, LOG, "GetRegisterLease", "workerId=%d, estimatedBlockCount=%d", new Object[]{Long.valueOf(j), Integer.valueOf(i)});
    }

    public void acquireRegisterLeaseWithBackoff(long j, int i, RetryPolicy retryPolicy) throws IOException, FailedToAcquireRegisterLeaseException {
        boolean z = false;
        GetRegisterLeasePResponse getRegisterLeasePResponse = null;
        while (!z && retryPolicy.attempt()) {
            LOG.debug("Worker {} attempting to grant registration lease from the master, iter {}", Long.valueOf(j), Integer.valueOf(retryPolicy.getAttemptCount()));
            getRegisterLeasePResponse = acquireRegisterLease(j, i);
            LOG.debug("Worker {} lease response: {}", Long.valueOf(j), getRegisterLeasePResponse);
            z = getRegisterLeasePResponse.getAllowed();
        }
        if (getRegisterLeasePResponse == null || !getRegisterLeasePResponse.getAllowed()) {
            throw new FailedToAcquireRegisterLeaseException(String.format("Failed to acquire a register lease from master after %d attempts", Integer.valueOf(retryPolicy.getAttemptCount())));
        }
        LOG.info("Worker {} acquired lease after {} attempts: {}", new Object[]{Long.valueOf(j), Integer.valueOf(retryPolicy.getAttemptCount()), getRegisterLeasePResponse});
    }

    public void register(long j, List<String> list, Map<String, Long> map, Map<String, Long> map2, Map<BlockStoreLocation, List<Long>> map3, Map<String, List<String>> map4, List<ConfigProperty> list2) throws IOException {
        RegisterWorkerPRequest build = RegisterWorkerPRequest.newBuilder().setWorkerId(j).addAllStorageTiers(list).putAllTotalBytesOnTiers(map).putAllUsedBytesOnTiers(map2).addAllCurrentBlocks(convertBlockListMapToProto(map3)).putAllLostStorage((Map) map4.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return StorageList.newBuilder().addAllStorage((Iterable) entry.getValue()).build();
        }))).setOptions(RegisterWorkerPOptions.newBuilder().addAllConfigs(list2).setBuildVersion(BuildVersion.newBuilder().setVersion("2.9.3").setRevision("9eae6eff2c3f553ed4e68373958288291edf97e3").build()).build()).build();
        retryRPC(() -> {
            this.mClient.registerWorker(build);
            return null;
        }, LOG, "Register", "workerId=%d", new Object[]{Long.valueOf(j)});
    }

    public void registerWithStream(long j, List<String> list, Map<String, Long> map, Map<String, Long> map2, Map<BlockStoreLocation, List<Long>> map3, Map<String, List<String>> map4, List<ConfigProperty> list2) throws IOException {
        AtomicReference atomicReference = new AtomicReference();
        retryRPC(() -> {
            try {
                new RegisterStreamer(this.mAsyncClient, j, (List<String>) list, (Map<String, Long>) map, (Map<String, Long>) map2, (Map<BlockStoreLocation, List<Long>>) map3, (Map<String, List<String>>) map4, (List<ConfigProperty>) list2).registerWithMaster();
                return null;
            } catch (IOException e) {
                atomicReference.set(e);
                return null;
            } catch (InterruptedException e2) {
                atomicReference.set(new IOException(e2));
                return null;
            }
        }, LOG, "Register", "workerId=%d", new Object[]{Long.valueOf(j)});
        if (atomicReference.get() != null) {
            throw ((IOException) atomicReference.get());
        }
    }

    public void notifyWorkerId(long j, WorkerNetAddress workerNetAddress) throws IOException {
        retryRPC(() -> {
            LOG.info("Notifying workerID to master {} with workerId {}, workerAddress {}", new Object[]{this.mServerAddress, Long.valueOf(j), workerNetAddress});
            return this.mClient.notifyWorkerId(NotifyWorkerIdPRequest.newBuilder().setWorkerId(j).setWorkerNetAddress(GrpcUtils.toProto(workerNetAddress)).build());
        }, LOG, "NotifyWorkerId", "workerId=%d, workerAddress=%s", new Object[]{Long.valueOf(j), workerNetAddress});
    }
}
