/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pinot.client;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.JdkSslContext;
import io.netty.handler.ssl.SslContext;
import java.io.InputStream;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.pinot.client.BrokerData;
import org.apache.pinot.client.TlsProtocols;
import org.apache.pinot.client.utils.BrokerSelectorUtils;
import org.apache.pinot.client.utils.ConnectionUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.AsyncHttpClientConfig;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;

public class BrokerCache {
    private static final TypeReference<Map<String, List<BrokerInstance>>> RESPONSE_TYPE_REF = new TypeReference<Map<String, List<BrokerInstance>>>(){};
    private static final String DEFAULT_CONTROLLER_READ_TIMEOUT_MS = "60000";
    private static final String DEFAULT_CONTROLLER_CONNECT_TIMEOUT_MS = "2000";
    private static final String DEFAULT_CONTROLLER_HANDSHAKE_TIMEOUT_MS = "2000";
    private static final String DEFAULT_CONTROLLER_TLS_V10_ENABLED = "false";
    private static final String SCHEME = "scheme";
    private final AsyncHttpClient _client;
    private final String _address;
    private final Map<String, String> _headers;
    private final Properties _properties;
    private volatile BrokerData _brokerData;

    public BrokerCache(Properties properties, String controllerUrl) {
        String scheme = properties.getProperty(SCHEME, "http");
        DefaultAsyncHttpClientConfig.Builder builder = Dsl.config();
        if (scheme.contentEquals("https")) {
            SSLContext sslContext = ConnectionUtils.getSSLContextFromProperties(properties);
            builder.setSslContext((SslContext)new JdkSslContext(sslContext, true, ClientAuth.OPTIONAL));
        }
        int readTimeoutMs = Integer.parseInt(properties.getProperty("controllerReadTimeoutMs", DEFAULT_CONTROLLER_READ_TIMEOUT_MS));
        int connectTimeoutMs = Integer.parseInt(properties.getProperty("controllerConnectTimeoutMs", "2000"));
        int handshakeTimeoutMs = Integer.parseInt(properties.getProperty("controllerHandshakeTimeoutMs", "2000"));
        String appId = properties.getProperty("appId");
        boolean tlsV10Enabled = Boolean.parseBoolean(properties.getProperty("controllerTlsV10Enabled", DEFAULT_CONTROLLER_TLS_V10_ENABLED)) || Boolean.parseBoolean(System.getProperties().getProperty("controller.tlsV10Enabled", DEFAULT_CONTROLLER_TLS_V10_ENABLED));
        TlsProtocols tlsProtocols = TlsProtocols.defaultProtocols(tlsV10Enabled);
        builder.setReadTimeout(Duration.ofMillis(readTimeoutMs)).setConnectTimeout(Duration.ofMillis(connectTimeoutMs)).setHandshakeTimeout(handshakeTimeoutMs).setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua_broker_cache", appId)).setEnabledProtocols(tlsProtocols.getEnabledProtocols().toArray(new String[0]));
        this._client = Dsl.asyncHttpClient((AsyncHttpClientConfig)builder.build());
        ControllerRequestURLBuilder controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl((String)(scheme + "://" + controllerUrl));
        this._address = controllerRequestURLBuilder.forLiveBrokerTablesGet();
        this._headers = ConnectionUtils.getHeadersFromProperties(properties);
        this._properties = properties;
    }

    private Map<String, List<BrokerInstance>> getTableToBrokersData() throws Exception {
        BoundRequestBuilder getRequest = this._client.prepareGet(this._address);
        if (this._headers != null) {
            this._headers.forEach((k, v) -> getRequest.addHeader((CharSequence)k, v));
        }
        ListenableFuture responseFuture = ((BoundRequestBuilder)getRequest.addHeader((CharSequence)"accept", "application/json")).execute();
        Response response = (Response)responseFuture.get();
        return (Map)JsonUtils.inputStreamToObject((InputStream)response.getResponseBodyAsStream(), RESPONSE_TYPE_REF);
    }

    private BrokerData getBrokerData(Map<String, List<BrokerInstance>> responses) {
        HashSet brokers = new HashSet();
        HashMap<String, List<String>> tableToBrokersMap = new HashMap<String, List<String>>();
        HashSet<String> uniqueTableNames = new HashSet<String>();
        for (Map.Entry<String, List<BrokerInstance>> tableToBrokers : responses.entrySet()) {
            ArrayList brokersForTable = new ArrayList();
            tableToBrokers.getValue().forEach(br -> {
                String brokerHostPort = br.getHost() + ":" + br.getPort();
                brokersForTable.add(brokerHostPort);
                brokers.add(brokerHostPort);
            });
            String tableName2 = tableToBrokers.getKey();
            tableToBrokersMap.put(tableName2, brokersForTable);
            String rawTableName = TableNameBuilder.extractRawTableName((String)tableName2);
            uniqueTableNames.add(rawTableName);
        }
        uniqueTableNames.forEach(tableName -> {
            if (!tableToBrokersMap.containsKey(tableName)) {
                String offlineTable = tableName + "_OFFLINE";
                String realtimeTable = tableName + "_REALTIME";
                if (tableToBrokersMap.containsKey(offlineTable) && tableToBrokersMap.containsKey(realtimeTable)) {
                    List realtimeBrokers = (List)tableToBrokersMap.get(realtimeTable);
                    List offlineBrokers = (List)tableToBrokersMap.get(offlineTable);
                    List tableBrokers = realtimeBrokers.stream().filter(offlineBrokers::contains).collect(Collectors.toList());
                    tableToBrokersMap.put((String)tableName, tableBrokers);
                } else {
                    tableToBrokersMap.put((String)tableName, tableToBrokersMap.getOrDefault(offlineTable, tableToBrokersMap.getOrDefault(realtimeTable, new ArrayList())));
                }
            }
        });
        return new BrokerData(tableToBrokersMap, new ArrayList<String>(brokers));
    }

    protected void updateBrokerData() throws Exception {
        Map<String, List<BrokerInstance>> responses = this.getTableToBrokersData();
        this._brokerData = this.getBrokerData(responses);
    }

    public String getBroker(String ... tableNames) {
        String[] stringArray = tableNames = tableNames == null ? tableNames : (String[])Arrays.stream(tableNames).filter(Objects::nonNull).toArray(String[]::new);
        if (tableNames == null || tableNames.length == 0) {
            List<String> brokers = this._brokerData.getBrokers();
            return brokers.get(ThreadLocalRandom.current().nextInt(brokers.size()));
        }
        return BrokerSelectorUtils.getRandomBroker(Arrays.asList(tableNames), this._brokerData.getTableToBrokerMap());
    }

    public List<String> getBrokers() {
        return this._brokerData.getBrokers();
    }

    @JsonIgnoreProperties(ignoreUnknown=true)
    private static class BrokerInstance {
        private String _host;
        private Integer _port;

        private BrokerInstance() {
        }

        public String getHost() {
            return this._host;
        }

        public void setHost(String host) {
            this._host = host;
        }

        public Integer getPort() {
            return this._port;
        }

        public void setPort(Integer port) {
            this._port = port;
        }
    }
}

