package com.playtika.test.kafka.configuration;

import com.playtika.test.common.utils.ContainerUtils;
import com.playtika.test.kafka.KafkaTopicsConfigurer;
import com.playtika.test.kafka.checks.KafkaStatusCheck;
import com.playtika.test.kafka.properties.KafkaConfigurationProperties;
import com.playtika.test.kafka.properties.ZookeeperConfigurationProperties;
import com.playtika.test.toxiproxy.EmbeddedToxiProxyBootstrapConfiguration;
import com.playtika.test.toxiproxy.condition.ConditionalOnToxiProxyEnabled;
import java.io.IOException;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.PosixFileAttributeView;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.Set;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.env.MapPropertySource;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;
import org.testcontainers.utility.MountableFile;

@EnableConfigurationProperties({KafkaConfigurationProperties.class, ZookeeperConfigurationProperties.class})
@Configuration
@AutoConfigureAfter({EmbeddedToxiProxyBootstrapConfiguration.class})
@ConditionalOnProperty(value = {"embedded.kafka.enabled"}, havingValue = "true", matchIfMissing = true)
/* loaded from: input_file:com/playtika/test/kafka/configuration/KafkaContainerConfiguration.class */
public class KafkaContainerConfiguration {
    private static final Logger log = LoggerFactory.getLogger(KafkaContainerConfiguration.class);
    public static final String KAFKA_HOST_NAME = "kafka-broker.testcontainer.docker";

    @ConditionalOnMissingBean({Network.class})
    @Bean(destroyMethod = "close")
    public Network kafkaNetwork() {
        Network newNetwork = Network.newNetwork();
        log.info("Created docker Network id={}", newNetwork.getId());
        return newNetwork;
    }

    @ConditionalOnMissingBean
    @Bean
    public KafkaStatusCheck kafkaStartupCheckStrategy(KafkaConfigurationProperties kafkaConfigurationProperties) {
        return new KafkaStatusCheck(kafkaConfigurationProperties);
    }

    @ConditionalOnToxiProxyEnabled(module = KafkaConfigurationProperties.KAFKA_BEAN_NAME)
    @Bean(name = {KafkaConfigurationProperties.KAFKA_PLAIN_TEXT_TOXI_PROXY_BEAN_NAME})
    ToxiproxyContainer.ContainerProxy kafkaContainerPlainTextProxy(ToxiproxyContainer toxiproxyContainer, KafkaConfigurationProperties kafkaConfigurationProperties, ConfigurableEnvironment configurableEnvironment) {
        ToxiproxyContainer.ContainerProxy proxy = toxiproxyContainer.getProxy(KAFKA_HOST_NAME, kafkaConfigurationProperties.getToxiProxyContainerBrokerPort());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("embedded.kafka.toxiproxy.brokerList", String.format("%s:%d", proxy.getContainerIpAddress(), Integer.valueOf(proxy.getProxyPort())));
        linkedHashMap.put("embedded.kafka.toxiproxy.proxyName", proxy.getName());
        configurableEnvironment.getPropertySources().addFirst(new MapPropertySource("embeddedKafkaPlainToxiProxyInfo", linkedHashMap));
        log.info("Kafka ToxiProxy plain-text connection details {}", linkedHashMap);
        return proxy;
    }

    @ConditionalOnToxiProxyEnabled(module = KafkaConfigurationProperties.KAFKA_BEAN_NAME)
    @Bean(name = {KafkaConfigurationProperties.KAFKA_SASL_TOXI_PROXY_BEAN_NAME})
    ToxiproxyContainer.ContainerProxy kafkaContainerSaslProxy(ToxiproxyContainer toxiproxyContainer, KafkaConfigurationProperties kafkaConfigurationProperties, ConfigurableEnvironment configurableEnvironment) {
        ToxiproxyContainer.ContainerProxy proxy = toxiproxyContainer.getProxy(KAFKA_HOST_NAME, kafkaConfigurationProperties.getToxiProxySaslPlaintextContainerBrokerPort());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("embedded.kafka.toxiproxy.saslPlaintext.brokerList", String.format("%s:%d", proxy.getContainerIpAddress(), Integer.valueOf(proxy.getProxyPort())));
        linkedHashMap.put("embedded.kafka.toxiproxy.saslPlaintext.proxyName", proxy.getName());
        configurableEnvironment.getPropertySources().addFirst(new MapPropertySource("embeddedKafkaSaslToxiProxyInfo", linkedHashMap));
        log.info("Kafka ToxiProxy SASL connection details {}", linkedHashMap);
        return proxy;
    }

    /* JADX WARN: Type inference failed for: r0v10, types: [com.playtika.test.kafka.configuration.KafkaContainerConfiguration$1] */
    @Bean(name = {KafkaConfigurationProperties.KAFKA_BEAN_NAME}, destroyMethod = "stop")
    public GenericContainer<?> kafka(KafkaStatusCheck kafkaStatusCheck, KafkaConfigurationProperties kafkaConfigurationProperties, ZookeeperConfigurationProperties zookeeperConfigurationProperties, ConfigurableEnvironment configurableEnvironment, Network network, @Autowired(required = false) @Qualifier("kafkaPlainTextContainerProxy") final ToxiproxyContainer.ContainerProxy containerProxy, @Autowired(required = false) @Qualifier("kafkaSaslContainerProxy") final ToxiproxyContainer.ContainerProxy containerProxy2) {
        final int containerBrokerPort = kafkaConfigurationProperties.getContainerBrokerPort();
        final int brokerPort = kafkaConfigurationProperties.getBrokerPort();
        final int saslPlaintextBrokerPort = kafkaConfigurationProperties.getSaslPlaintextBrokerPort();
        KafkaContainer kafkaContainer = (KafkaContainer) new KafkaContainer(ContainerUtils.getDockerImageName(kafkaConfigurationProperties)) { // from class: com.playtika.test.kafka.configuration.KafkaContainerConfiguration.1
            public String getBootstrapServers() {
                ArrayList arrayList = new ArrayList();
                arrayList.add("EXTERNAL_PLAINTEXT://" + getHost() + ":" + getMappedPort(brokerPort));
                arrayList.add("EXTERNAL_SASL_PLAINTEXT://" + getHost() + ":" + getMappedPort(saslPlaintextBrokerPort));
                arrayList.add("INTERNAL_PLAINTEXT://kafka-broker.testcontainer.docker:" + containerBrokerPort);
                if (containerProxy != null) {
                    arrayList.add("TOXIPROXY_INTERNAL_PLAINTEXT://" + getHost() + ":" + containerProxy.getProxyPort());
                }
                if (containerProxy2 != null) {
                    arrayList.add("TOXIPROXY_INTERNAL_SASL_PLAINTEXT://" + getHost() + ":" + containerProxy2.getProxyPort());
                }
                return String.join(",", arrayList);
            }
        }.withCreateContainerCmdModifier(createContainerCmd -> {
            createContainerCmd.withUser(kafkaConfigurationProperties.getDockerUser());
        }).withCreateContainerCmdModifier(createContainerCmd2 -> {
            createContainerCmd2.withHostName(KAFKA_HOST_NAME);
        }).withEmbeddedZookeeper().withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "EXTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT,INTERNAL_PLAINTEXT:PLAINTEXT,BROKER:PLAINTEXT,TOXIPROXY_INTERNAL_PLAINTEXT:PLAINTEXT,TOXIPROXY_INTERNAL_SASL_PLAINTEXT:SASL_PLAINTEXT").withEnv("KAFKA_LISTENERS", "EXTERNAL_PLAINTEXT://0.0.0.0:" + brokerPort + ",EXTERNAL_SASL_PLAINTEXT://0.0.0.0:" + saslPlaintextBrokerPort + ",INTERNAL_PLAINTEXT://0.0.0.0:" + containerBrokerPort + ",TOXIPROXY_INTERNAL_PLAINTEXT://0.0.0.0:" + kafkaConfigurationProperties.getToxiProxyContainerBrokerPort() + ",TOXIPROXY_INTERNAL_SASL_PLAINTEXT://0.0.0.0:" + kafkaConfigurationProperties.getToxiProxySaslPlaintextContainerBrokerPort() + ",BROKER://0.0.0.0:9092").withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER").withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1").withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", String.valueOf(kafkaConfigurationProperties.getOffsetsTopicReplicationFactor())).withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1").withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false").withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1").withEnv("KAFKA_LOG_FLUSH_INTERVAL_MS", String.valueOf(kafkaConfigurationProperties.getLogFlushIntervalMs())).withEnv("KAFKA_REPLICA_SOCKET_TIMEOUT_MS", String.valueOf(kafkaConfigurationProperties.getReplicaSocketTimeoutMs())).withEnv("KAFKA_CONTROLLER_SOCKET_TIMEOUT_MS", String.valueOf(kafkaConfigurationProperties.getControllerSocketTimeoutMs())).withEnv("KAFKA_SASL_ENABLED_MECHANISMS", "PLAIN").withEnv("ZOOKEEPER_SASL_ENABLED", "false").withCopyFileToContainer(MountableFile.forClasspathResource("kafka_server_jaas.conf"), "/etc/kafka/kafka_server_jaas.conf").withEnv("KAFKA_OPTS", "-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf").withEnv("KAFKA_GC_LOG_OPTS", "-Dnogclog").withExposedPorts(new Integer[]{Integer.valueOf(containerBrokerPort), Integer.valueOf(brokerPort), Integer.valueOf(saslPlaintextBrokerPort)}).withNetwork(network).withNetworkAliases(new String[]{KAFKA_HOST_NAME}).withExtraHost(KAFKA_HOST_NAME, "127.0.0.1").waitingFor(kafkaStatusCheck);
        kafkaFileSystemBind(kafkaConfigurationProperties, kafkaContainer);
        zookeperFileSystemBind(zookeeperConfigurationProperties, kafkaContainer);
        KafkaContainer configureCommonsAndStart = ContainerUtils.configureCommonsAndStart(kafkaContainer, kafkaConfigurationProperties, log);
        registerKafkaEnvironment(configureCommonsAndStart, configurableEnvironment, kafkaConfigurationProperties);
        return configureCommonsAndStart;
    }

    private void kafkaFileSystemBind(KafkaConfigurationProperties kafkaConfigurationProperties, KafkaContainer kafkaContainer) {
        KafkaConfigurationProperties.FileSystemBind fileSystemBind = kafkaConfigurationProperties.getFileSystemBind();
        if (fileSystemBind.isEnabled()) {
            Path absolutePath = Paths.get(fileSystemBind.getDataFolder(), LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH-mm-ss-nnnnnnnnn"))).toAbsolutePath();
            log.info("Writing kafka data to: {}", absolutePath);
            createPathAndParentOrMakeWritable(absolutePath);
            kafkaContainer.addFileSystemBind(absolutePath.toString(), "/var/lib/kafka/data", BindMode.READ_WRITE);
        }
    }

    private void zookeperFileSystemBind(ZookeeperConfigurationProperties zookeeperConfigurationProperties, KafkaContainer kafkaContainer) {
        ZookeeperConfigurationProperties.FileSystemBind fileSystemBind = zookeeperConfigurationProperties.getFileSystemBind();
        if (fileSystemBind.isEnabled()) {
            String format = LocalDateTime.now().format(DateTimeFormatter.ofPattern("HH-mm-ss-nnnnnnnnn"));
            Path absolutePath = Paths.get(fileSystemBind.getDataFolder(), format).toAbsolutePath();
            log.info("Writing zookeeper data to: {}", absolutePath);
            Path absolutePath2 = Paths.get(fileSystemBind.getTxnLogsFolder(), format).toAbsolutePath();
            log.info("Writing zookeeper transaction logs to: {}", absolutePath2);
            createPathAndParentOrMakeWritable(absolutePath);
            kafkaContainer.addFileSystemBind(absolutePath.toString(), "/var/lib/zookeeper/data", BindMode.READ_WRITE);
            createPathAndParentOrMakeWritable(absolutePath2);
            kafkaContainer.addFileSystemBind(absolutePath2.toString(), "/var/lib/zookeeper/log", BindMode.READ_WRITE);
        }
    }

    private void registerKafkaEnvironment(GenericContainer<?> genericContainer, ConfigurableEnvironment configurableEnvironment, KafkaConfigurationProperties kafkaConfigurationProperties) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        String host = genericContainer.getHost();
        linkedHashMap.put("embedded.kafka.brokerList", String.format("%s:%d", host, genericContainer.getMappedPort(kafkaConfigurationProperties.getBrokerPort())));
        linkedHashMap.put("embedded.kafka.saslPlaintext.brokerList", String.format("%s:%d", host, genericContainer.getMappedPort(kafkaConfigurationProperties.getSaslPlaintextBrokerPort())));
        linkedHashMap.put("embedded.kafka.saslPlaintext.user", KafkaConfigurationProperties.KAFKA_USER);
        linkedHashMap.put("embedded.kafka.saslPlaintext.password", KafkaConfigurationProperties.KAFKA_PASSWORD);
        linkedHashMap.put("embedded.kafka.containerBrokerList", String.format("%s:%d", KAFKA_HOST_NAME, Integer.valueOf(kafkaConfigurationProperties.getContainerBrokerPort())));
        MapPropertySource mapPropertySource = new MapPropertySource("embeddedKafkaInfo", linkedHashMap);
        log.info("Started kafka broker. Connection details: {}", linkedHashMap);
        configurableEnvironment.getPropertySources().addFirst(mapPropertySource);
    }

    @Bean
    public KafkaTopicsConfigurer kafkaConfigurer(GenericContainer<?> genericContainer, KafkaConfigurationProperties kafkaConfigurationProperties, ZookeeperConfigurationProperties zookeeperConfigurationProperties) {
        return new KafkaTopicsConfigurer(genericContainer, zookeeperConfigurationProperties, kafkaConfigurationProperties);
    }

    private void createPathAndParentOrMakeWritable(Path path) {
        Stream.of((Object[]) new Path[]{path.getParent(), path}).forEach(path2 -> {
            if (path2.toFile().isDirectory()) {
                makeWritable(path2);
                return;
            }
            try {
                log.info("Create writable folder: {}", path2);
                Files.createDirectory(path2, PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rwxrwxrwx")));
            } catch (FileAlreadyExistsException e) {
                makeWritable(path2);
            } catch (IOException e2) {
                throw new RuntimeException(e2);
            }
        });
    }

    private void makeWritable(Path path) {
        PosixFileAttributeView posixFileAttributeView = (PosixFileAttributeView) Files.getFileAttributeView(path, PosixFileAttributeView.class, new LinkOption[0]);
        if (posixFileAttributeView == null) {
            log.warn("Couldn't get file permissions: {}", path);
            return;
        }
        try {
            Set<PosixFilePermission> permissions = posixFileAttributeView.readAttributes().permissions();
            if (permissions.add(PosixFilePermission.OTHERS_WRITE)) {
                log.info("Make writable to others: {}", path);
                posixFileAttributeView.setPermissions(permissions);
            }
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }
}
