/*
 * Decompiled with CFR 0.152.
 */
package org.apache.atlas.hook;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.Properties;
import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.ZkConnection;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.utils.AuthenticationUtil;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class AtlasTopicCreator {
    private static final Logger LOG = LoggerFactory.getLogger(AtlasTopicCreator.class);
    public static final String ATLAS_NOTIFICATION_CREATE_TOPICS_KEY = "atlas.notification.create.topics";

    public void createAtlasTopic(Configuration atlasProperties, String ... topicNames) {
        if (atlasProperties.getBoolean(ATLAS_NOTIFICATION_CREATE_TOPICS_KEY, true)) {
            if (!this.handleSecurity(atlasProperties)) {
                return;
            }
            ZkUtils zkUtils = this.createZkUtils(atlasProperties);
            for (String topicName : topicNames) {
                try {
                    LOG.warn("Attempting to create topic {}", (Object)topicName);
                    if (!this.ifTopicExists(topicName, zkUtils)) {
                        this.createTopic(atlasProperties, topicName, zkUtils);
                        continue;
                    }
                    LOG.warn("Ignoring call to create topic {}, as it already exists.", (Object)topicName);
                }
                catch (Throwable t) {
                    LOG.error("Failed while creating topic {}", (Object)topicName, (Object)t);
                }
            }
            zkUtils.close();
        } else {
            LOG.info("Not creating topics {} as {} is false", (Object)StringUtils.join((Object[])topicNames, (String)","), (Object)ATLAS_NOTIFICATION_CREATE_TOPICS_KEY);
        }
    }

    @VisibleForTesting
    protected boolean handleSecurity(Configuration atlasProperties) {
        if (AuthenticationUtil.isKerberosAuthenticationEnabled((Configuration)atlasProperties)) {
            String kafkaPrincipal = atlasProperties.getString("atlas.notification.kafka.service.principal");
            String kafkaKeyTab = atlasProperties.getString("atlas.notification.kafka.keytab.location");
            org.apache.hadoop.conf.Configuration hadoopConf = new org.apache.hadoop.conf.Configuration();
            SecurityUtil.setAuthenticationMethod((UserGroupInformation.AuthenticationMethod)UserGroupInformation.AuthenticationMethod.KERBEROS, (org.apache.hadoop.conf.Configuration)hadoopConf);
            try {
                String serverPrincipal = SecurityUtil.getServerPrincipal((String)kafkaPrincipal, (String)null);
                UserGroupInformation.setConfiguration((org.apache.hadoop.conf.Configuration)hadoopConf);
                UserGroupInformation.loginUserFromKeytab((String)serverPrincipal, (String)kafkaKeyTab);
            }
            catch (IOException e) {
                LOG.warn("Could not login as {} from keytab file {}", new Object[]{kafkaPrincipal, kafkaKeyTab, e});
                return false;
            }
        }
        return true;
    }

    @VisibleForTesting
    protected boolean ifTopicExists(String topicName, ZkUtils zkUtils) {
        return AdminUtils.topicExists((ZkUtils)zkUtils, (String)topicName);
    }

    @VisibleForTesting
    protected void createTopic(Configuration atlasProperties, String topicName, ZkUtils zkUtils) {
        int numPartitions = atlasProperties.getInt("atlas.notification.hook.numthreads", 1);
        int numReplicas = atlasProperties.getInt("atlas.notification.replicas", 1);
        AdminUtils.createTopic((ZkUtils)zkUtils, (String)topicName, (int)numPartitions, (int)numReplicas, (Properties)new Properties(), (RackAwareMode)RackAwareMode.Enforced$.MODULE$);
        LOG.warn("Created topic {} with partitions {} and replicas {}", new Object[]{topicName, numPartitions, numReplicas});
    }

    @VisibleForTesting
    protected ZkUtils createZkUtils(Configuration atlasProperties) {
        String zkConnect = atlasProperties.getString("atlas.kafka.zookeeper.connect");
        int sessionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.session.timeout.ms", 400);
        int connectionTimeout = atlasProperties.getInt("atlas.kafka.zookeeper.connection.timeout.ms", 200);
        Tuple2 zkClientAndConnection = ZkUtils.createZkClientAndConnection((String)zkConnect, (int)sessionTimeout, (int)connectionTimeout);
        return new ZkUtils((ZkClient)zkClientAndConnection._1(), (ZkConnection)zkClientAndConnection._2(), false);
    }

    public static void main(String[] args) throws AtlasException {
        Configuration configuration = ApplicationProperties.get();
        AtlasTopicCreator atlasTopicCreator = new AtlasTopicCreator();
        atlasTopicCreator.createAtlasTopic(configuration, args);
    }
}

