package alluxio.membership;

import alluxio.conf.AlluxioConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.status.AlreadyExistsException;
import alluxio.exception.status.InvalidArgumentException;
import alluxio.metrics.Metric;
import alluxio.shaded.client.com.google.common.annotations.VisibleForTesting;
import alluxio.shaded.client.com.google.gson.JsonParseException;
import alluxio.shaded.client.io.etcd.jetcd.ByteSequence;
import alluxio.shaded.client.io.etcd.jetcd.KeyValue;
import alluxio.shaded.client.io.etcd.jetcd.Txn;
import alluxio.shaded.client.io.etcd.jetcd.kv.TxnResponse;
import alluxio.shaded.client.io.etcd.jetcd.op.Cmp;
import alluxio.shaded.client.io.etcd.jetcd.op.CmpTarget;
import alluxio.shaded.client.io.etcd.jetcd.op.Op;
import alluxio.shaded.client.io.etcd.jetcd.options.GetOption;
import alluxio.shaded.client.io.etcd.jetcd.options.PutOption;
import alluxio.util.CommonUtils;
import alluxio.wire.WorkerInfo;
import alluxio.wire.WorkerState;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:alluxio/membership/EtcdMembershipManager.class */
public class EtcdMembershipManager implements MembershipManager {
    private static final Logger LOG = LoggerFactory.getLogger(EtcdMembershipManager.class);
    private final AlluxioConfiguration mConf;
    private AlluxioEtcdClient mAlluxioEtcdClient;
    private String mClusterName;
    private Supplier<String> mRingPathPrefix;

    public static EtcdMembershipManager create(AlluxioConfiguration alluxioConfiguration) {
        return new EtcdMembershipManager(alluxioConfiguration);
    }

    public EtcdMembershipManager(AlluxioConfiguration alluxioConfiguration) {
        this(alluxioConfiguration, AlluxioEtcdClient.getInstance(alluxioConfiguration));
    }

    public EtcdMembershipManager(AlluxioConfiguration alluxioConfiguration, AlluxioEtcdClient alluxioEtcdClient) {
        this.mRingPathPrefix = CommonUtils.memoize(this::constructRingPathPrefix);
        this.mConf = alluxioConfiguration;
        this.mClusterName = alluxioConfiguration.getString(PropertyKey.ALLUXIO_CLUSTER_NAME);
        this.mAlluxioEtcdClient = alluxioEtcdClient;
    }

    private String constructRingPathPrefix() {
        return String.format("/DHT/%s/AUTHORIZED/", this.mClusterName);
    }

    private String getRingPathPrefix() {
        return this.mRingPathPrefix.get();
    }

    @Override // alluxio.membership.MembershipManager
    public void join(WorkerInfo workerInfo) throws IOException {
        LOG.info("Try joining on etcd for worker:{} ", workerInfo);
        WorkerServiceEntity workerServiceEntity = new WorkerServiceEntity(workerInfo.getIdentity(), workerInfo.getAddress());
        workerServiceEntity.setLeaseTTLInSec(this.mConf.getDuration(PropertyKey.WORKER_FAILURE_DETECTION_TIMEOUT).getSeconds());
        String stringBuffer = new StringBuffer().append(getRingPathPrefix()).append(workerServiceEntity.getServiceEntityName()).toString();
        byte[] serialize = workerServiceEntity.serialize();
        try {
            boolean z = this.mConf.isSet(PropertyKey.K8S_ENV_DEPLOYMENT) && this.mConf.getBoolean(PropertyKey.K8S_ENV_DEPLOYMENT);
            Txn txn = this.mAlluxioEtcdClient.getEtcdClient().getKVClient().txn();
            ByteSequence from = ByteSequence.from(stringBuffer, StandardCharsets.UTF_8);
            ByteSequence from2 = ByteSequence.from(serialize);
            Txn Then = txn.If(new Cmp(from, Cmp.Op.EQUAL, CmpTarget.version(0L))).Then(Op.put(from, from2, PutOption.newBuilder().build()));
            Op[] opArr = new Op[1];
            opArr[0] = z ? Op.put(from, from2, PutOption.newBuilder().build()) : Op.get(from, GetOption.DEFAULT);
            TxnResponse txnResponse = Then.Else(opArr).commit().get();
            if (!z && !txnResponse.isSucceeded()) {
                ArrayList arrayList = new ArrayList();
                txnResponse.getGetResponses().stream().map(getResponse -> {
                    return Boolean.valueOf(arrayList.addAll(getResponse.getKvs()));
                }).collect(Collectors.toList());
                Optional max = arrayList.stream().max((keyValue, keyValue2) -> {
                    return (int) (keyValue.getModRevision() - keyValue2.getModRevision());
                });
                if (max.isPresent() && !Arrays.equals(((KeyValue) max.get()).getValue().getBytes(), serialize)) {
                    Optional<WorkerServiceEntity> parseWorkerServiceEntity = parseWorkerServiceEntity((KeyValue) max.get());
                    if (!parseWorkerServiceEntity.isPresent()) {
                        throw new IOException(String.format("Existing WorkerServiceEntity for path:%s corrupted", stringBuffer));
                    }
                    if (!parseWorkerServiceEntity.get().equalsIgnoringOptionalFields(workerServiceEntity)) {
                        throw new AlreadyExistsException(String.format("Some other member with same id registered on the ring, bail.Conflicting worker addr:%s, worker identity:%s.Different workers can't assume same worker identity in non-k8s env,clean local worker identity settings to continue.", parseWorkerServiceEntity.get().getWorkerNetAddress().toString(), parseWorkerServiceEntity.get().getIdentity()));
                    }
                    this.mAlluxioEtcdClient.createForPath(stringBuffer, Optional.of(serialize));
                }
            }
            this.mAlluxioEtcdClient.mServiceDiscovery.registerAndStartSync(workerServiceEntity);
            LOG.info("Joined on etcd for worker:{} ", workerInfo);
        } catch (InterruptedException | ExecutionException e) {
            throw new IOException(e);
        }
    }

    @Override // alluxio.membership.MembershipManager
    public WorkerClusterView getAllMembers() throws IOException {
        Set set = (Set) parseWorkersFromEtcdKvPairs(this.mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices()).map((v0) -> {
            return v0.getIdentity();
        }).collect(Collectors.toSet());
        Predicate predicate = workerInfo -> {
            return set.contains(workerInfo.getIdentity());
        };
        Stream map = parseWorkersFromEtcdKvPairs(this.mAlluxioEtcdClient.getChildren(getRingPathPrefix())).map(workerServiceEntity -> {
            return new WorkerInfo().setIdentity(workerServiceEntity.getIdentity()).setAddress(workerServiceEntity.getWorkerNetAddress());
        }).map(workerInfo2 -> {
            return workerInfo2.setState(predicate.test(workerInfo2) ? WorkerState.LIVE : WorkerState.LOST);
        });
        map.getClass();
        return new WorkerClusterView(map::iterator);
    }

    @Override // alluxio.membership.MembershipManager
    public WorkerClusterView getLiveMembers() throws IOException {
        Stream<R> map = parseWorkersFromEtcdKvPairs(this.mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices()).map(workerServiceEntity -> {
            return new WorkerInfo().setIdentity(workerServiceEntity.getIdentity()).setAddress(workerServiceEntity.getWorkerNetAddress()).setState(WorkerState.LIVE);
        });
        map.getClass();
        return new WorkerClusterView(map::iterator);
    }

    @Override // alluxio.membership.MembershipManager
    public WorkerClusterView getFailedMembers() throws IOException {
        Set set = (Set) parseWorkersFromEtcdKvPairs(this.mAlluxioEtcdClient.mServiceDiscovery.getAllLiveServices()).map((v0) -> {
            return v0.getIdentity();
        }).collect(Collectors.toSet());
        Stream<R> map = parseWorkersFromEtcdKvPairs(this.mAlluxioEtcdClient.getChildren(getRingPathPrefix())).filter(workerServiceEntity -> {
            return !set.contains(workerServiceEntity.getIdentity());
        }).map(workerServiceEntity2 -> {
            return new WorkerInfo().setIdentity(workerServiceEntity2.getIdentity()).setAddress(workerServiceEntity2.getWorkerNetAddress()).setState(WorkerState.LOST);
        });
        map.getClass();
        return new WorkerClusterView(map::iterator);
    }

    private Stream<WorkerServiceEntity> parseWorkersFromEtcdKvPairs(List<KeyValue> list) {
        return list.stream().map(this::parseWorkerServiceEntity).filter((v0) -> {
            return v0.isPresent();
        }).map((v0) -> {
            return v0.get();
        });
    }

    private Optional<WorkerServiceEntity> parseWorkerServiceEntity(KeyValue keyValue) {
        try {
            WorkerServiceEntity workerServiceEntity = new WorkerServiceEntity();
            workerServiceEntity.deserialize(keyValue.getValue().getBytes());
            return Optional.of(workerServiceEntity);
        } catch (JsonParseException e) {
            return Optional.empty();
        }
    }

    @Override // alluxio.membership.MembershipManager
    @VisibleForTesting
    public String showAllMembers() {
        try {
            WorkerClusterView allMembers = getAllMembers();
            WorkerClusterView liveMembers = getLiveMembers();
            StringBuilder sb = new StringBuilder(String.format("%s\t%s\t%s%n", "WorkerId", "Address", "Status"));
            Iterator<WorkerInfo> it = allMembers.iterator();
            while (it.hasNext()) {
                WorkerInfo next = it.next();
                Object[] objArr = new Object[3];
                objArr[0] = next.getIdentity();
                objArr[1] = next.getAddress().getHost() + Metric.TAG_SEPARATOR + next.getAddress().getRpcPort();
                objArr[2] = liveMembers.getWorkerById(next.getIdentity()).isPresent() ? "ONLINE" : "OFFLINE";
                sb.append(String.format("%s\t%s\t%s%n", objArr));
            }
            return sb.toString();
        } catch (IOException e) {
            return String.format("Exception happened:%s", e.getMessage());
        }
    }

    @Override // alluxio.membership.MembershipManager
    public void stopHeartBeat(WorkerInfo workerInfo) throws IOException {
        this.mAlluxioEtcdClient.mServiceDiscovery.unregisterService(new WorkerServiceEntity(workerInfo.getIdentity(), workerInfo.getAddress()).getServiceEntityName());
    }

    @Override // alluxio.membership.MembershipManager
    public void decommission(WorkerInfo workerInfo) throws IOException {
        Optional<WorkerInfo> workerById = getAllMembers().getWorkerById(workerInfo.getIdentity());
        if (!workerById.isPresent()) {
            throw new InvalidArgumentException(String.format("Unrecognized or non-existing worker: %s", workerInfo.getIdentity()));
        }
        if (workerById.get().getState() != WorkerState.LOST) {
            throw new InvalidArgumentException(String.format("Can't remove running worker: %s, stop the worker before removing", workerInfo.getIdentity()));
        }
        stopHeartBeat(workerInfo);
        this.mAlluxioEtcdClient.deleteForPath(new StringBuffer().append(getRingPathPrefix()).append(workerInfo.getIdentity()).toString(), false);
        LOG.info("Successfully removed worker:{}", workerInfo.getIdentity());
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
    }
}
