/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.client;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.serialize.BytesPushThroughSerializer;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.pinot.client.BrokerSelector;
import org.apache.pinot.client.ExternalViewReader;
import org.apache.pinot.client.utils.BrokerSelectorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DynamicBrokerSelector
implements BrokerSelector,
IZkDataListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(DynamicBrokerSelector.class);
    protected final AtomicReference<Map<String, List<String>>> _tableToBrokerListMapRef = new AtomicReference();
    protected final AtomicReference<List<String>> _allBrokerListRef = new AtomicReference();
    protected final ZkClient _zkClient;
    protected final ExternalViewReader _evReader;

    public DynamicBrokerSelector(String zkServers, boolean preferTlsPort) {
        this._zkClient = this.getZkClient(zkServers);
        this._zkClient.setZkSerializer((ZkSerializer)new BytesPushThroughSerializer());
        this._zkClient.waitUntilConnected(60L, TimeUnit.SECONDS);
        this._zkClient.subscribeDataChanges("/EXTERNALVIEW/brokerResource", (IZkDataListener)this);
        this._evReader = this.getEvReader(this._zkClient, preferTlsPort);
        this.refresh();
    }

    public DynamicBrokerSelector(String zkServers) {
        this(zkServers, false);
    }

    @VisibleForTesting
    protected ZkClient getZkClient(String zkServers) {
        return new ZkClient(zkServers);
    }

    @VisibleForTesting
    protected ExternalViewReader getEvReader(ZkClient zkClient) {
        return this.getEvReader(zkClient, false);
    }

    @VisibleForTesting
    protected ExternalViewReader getEvReader(ZkClient zkClient, boolean preferTlsPort) {
        return new ExternalViewReader(zkClient, preferTlsPort);
    }

    private void refresh() {
        Map<String, List<String>> tableToBrokerListMap = this._evReader.getTableToBrokersMap();
        this._tableToBrokerListMapRef.set(tableToBrokerListMap);
        HashSet<String> brokerSet = new HashSet<String>();
        for (List<String> brokerList : tableToBrokerListMap.values()) {
            brokerSet.addAll(brokerList);
        }
        this._allBrokerListRef.set(new ArrayList(brokerSet));
        LOGGER.info("Refreshed table to broker list map: {}", this._tableToBrokerListMapRef.get());
    }

    @Override
    @Nullable
    public String selectBroker(String ... tableNames) {
        String randomBroker;
        if (tableNames != null && tableNames.length != 0 && tableNames[0] != null && (randomBroker = BrokerSelectorUtils.getRandomBroker(Arrays.asList(tableNames), this._tableToBrokerListMapRef.get())) != null) {
            return randomBroker;
        }
        List<String> list = this._allBrokerListRef.get();
        if (list != null && !list.isEmpty()) {
            return list.get(ThreadLocalRandom.current().nextInt(list.size()));
        }
        return null;
    }

    @Override
    public List<String> getBrokers() {
        return this._allBrokerListRef.get();
    }

    @Override
    public void close() {
        this._zkClient.close();
    }

    public void handleDataChange(String dataPath, Object data) {
        this.refresh();
    }

    public void handleDataDeleted(String dataPath) {
        this.refresh();
    }
}

