/*
 * Decompiled with CFR 0.152.
 */
package org.voltdb.client.topics;

import com.google_voltpatches.common.collect.ImmutableMap;
import com.google_voltpatches.common.collect.Maps;
import com.google_voltpatches.common.net.HostAndPort;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.logging.Logger;
import javax.security.auth.login.AppConfigurationEntry;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.JaasContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.voltdb.VoltTable;
import org.voltdb.VoltType;
import org.voltdb.VoltTypeException;
import org.voltdb.client.ClientConfig;
import org.voltdb.client.ClientFactory;
import org.voltdb.client.ClientImpl;

public class VoltDBKafkaPartitioner
extends DefaultPartitioner {
    public static final String BOOTSTRAP_SERVERS_VOLTDB = "bootstrap.servers.voltdb";
    private static final String PLAIN_SASL_MECHANISM = "PLAIN";
    static final Logger LOG = Logger.getLogger(VoltDBKafkaPartitioner.class.getName());
    protected ClientImpl m_client;
    private ImmutableMap<String, Boolean> m_topics = ImmutableMap.of();

    public void configure(Map<String, ?> original) {
        PartitionConfig configs = new PartitionConfig(original);
        this.m_client = (ClientImpl)ClientFactory.createClient(this.createClientConfig(configs));
        List urls = configs.getList(BOOTSTRAP_SERVERS_VOLTDB);
        boolean useDefault = false;
        if (urls.isEmpty()) {
            urls = configs.getList("bootstrap.servers");
            useDefault = true;
        }
        boolean connected = false;
        for (String connection : urls) {
            if (useDefault) {
                HostAndPort url = HostAndPort.fromString(connection);
                connection = HostAndPort.fromParts(url.getHost(), 21212).toString();
            }
            try {
                this.m_client.createConnection(connection);
                connected = true;
            }
            catch (IOException e) {
                String fmt = "Failed to open connection to the database host at %s: %s";
                LOG.warning(String.format(fmt, connection, e.getMessage()));
            }
        }
        if (!connected) {
            throw new KafkaException(String.format("Failed to connect to any database host in %s", urls));
        }
    }

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        Boolean entry = this.m_topics.get(topic);
        if (entry == null) {
            this.loadTopics();
            entry = this.m_topics.get(topic);
            if (entry == null) {
                throw new KafkaException(String.format("Topic %s is not found.", topic));
            }
        }
        if (entry.booleanValue()) {
            return super.partition(topic, key, keyBytes, value, valueBytes, cluster);
        }
        int partition = -1;
        VoltType keyType = null;
        if (key != null) {
            try {
                keyType = VoltType.typeFromObject(key);
            }
            catch (VoltTypeException voltTypeException) {
                // empty catch block
            }
        }
        if (keyType != null) {
            partition = (int)this.m_client.getPartitionForParameter(keyType.getValue(), key);
        } else if (keyBytes != null) {
            partition = (int)this.m_client.getPartitionForParameter(keyBytes);
        }
        return partition > -1 ? partition : super.partition(topic, key, keyBytes, value, valueBytes, cluster);
    }

    public void close() {
        super.close();
        if (this.m_client != null) {
            try {
                this.m_client.close();
            }
            catch (Exception e) {
                LOG.warning("Failed to close connections:" + e.getMessage());
            }
        }
    }

    protected void loadTopics() {
        try {
            VoltTable topics = this.m_client.callProcedure("@SystemCatalog", "TOPICS").getResults()[0];
            HashMap<String, Boolean> topicsMap = Maps.newHashMap();
            while (topics.advanceRow()) {
                topicsMap.put(topics.getString("TOPIC_NAME"), Boolean.parseBoolean(topics.getString("IS_OPAQUE")));
            }
            this.m_topics = ImmutableMap.copyOf(topicsMap);
        }
        catch (Exception e) {
            throw new KafkaException("Failed to get topics from the database cluster", (Throwable)e);
        }
    }

    protected ClientConfig createClientConfig(PartitionConfig configs) {
        boolean sslEnabled;
        SecurityProtocol protocol = SecurityProtocol.forName((String)configs.getString("security.protocol"));
        String userName = null;
        String password = null;
        if (protocol == SecurityProtocol.SASL_PLAINTEXT || protocol == SecurityProtocol.SASL_SSL) {
            if (!PLAIN_SASL_MECHANISM.equals(configs.getString("sasl.mechanism"))) {
                throw new IllegalArgumentException("Only PLAIN is supported for sasl.mechanism");
            }
            Password jaasConfigString = configs.getPassword("sasl.jaas.config");
            if (jaasConfigString == null) {
                throw new IllegalArgumentException("SASL JAAS configuration not supplied when SASL was specified");
            }
            JaasContext context = JaasContext.loadClientContext((Map)configs.values());
            String moduleName = PlainLoginModule.class.getName();
            for (AppConfigurationEntry ace : context.configurationEntries()) {
                if (!moduleName.equals(ace.getLoginModuleName())) continue;
                Map<String, ?> options = ace.getOptions();
                userName = (String)options.get("username");
                password = (String)options.get("password");
                break;
            }
        }
        ClientConfig clientConfig = new ClientConfig(userName, password);
        clientConfig.setTopologyChangeAware(true);
        boolean bl = sslEnabled = protocol == SecurityProtocol.SSL || protocol == SecurityProtocol.SASL_SSL;
        if (sslEnabled) {
            clientConfig.enableSSL();
            String sslStore = configs.getString("ssl.truststore.location");
            if (!StringUtils.isEmpty((CharSequence)sslStore)) {
                Password storePassword = configs.getPassword("ssl.truststore.password");
                clientConfig.setTrustStore(sslStore, storePassword == null ? null : storePassword.value());
            }
        }
        return clientConfig;
    }

    private static final class PartitionConfig
    extends AbstractConfig {
        private static final ConfigDef s_config = ProducerConfig.configDef().define("bootstrap.servers.voltdb", ConfigDef.Type.LIST, Collections.emptyList(), (ConfigDef.Validator)new ConfigDef.NonNullValidator(), ConfigDef.Importance.MEDIUM, "List of database servers to connect to. Defaults to bootstrap.servers using the deafult port  of 21212");

        public PartitionConfig(Map<?, ?> originals) {
            super(s_config, originals, false);
        }
    }
}

