package org.testcontainers.containers;

import com.github.dockerjava.api.command.ExecCreateCmdResponse;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.core.command.ExecStartResultCallback;
import java.util.concurrent.TimeUnit;
import org.testcontainers.utility.Base58;
import org.testcontainers.utility.TestcontainersConfiguration;

/* loaded from: input_file:org/testcontainers/containers/KafkaContainer.class */
public class KafkaContainer extends GenericContainer<KafkaContainer> {
    public static final int KAFKA_PORT = 9093;
    public static final int ZOOKEEPER_PORT = 2181;
    private static final int PORT_NOT_ASSIGNED = -1;
    protected String externalZookeeperConnect;
    private int port;

    public KafkaContainer() {
        this("5.2.1");
    }

    public KafkaContainer(String str) {
        super(TestcontainersConfiguration.getInstance().getKafkaImage() + ":" + str);
        this.externalZookeeperConnect = null;
        this.port = PORT_NOT_ASSIGNED;
        withNetwork(Network.newNetwork());
        withNetworkAliases(new String[]{"kafka-" + Base58.randomString(6)});
        withExposedPorts(new Integer[]{Integer.valueOf(KAFKA_PORT)});
        withEnv("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:9093,BROKER://0.0.0.0:9092");
        withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
        withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");
        withEnv("KAFKA_BROKER_ID", "1");
        withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1");
        withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "1");
        withEnv("KAFKA_LOG_FLUSH_INTERVAL_MESSAGES", "9223372036854775807");
        withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");
    }

    public KafkaContainer withEmbeddedZookeeper() {
        this.externalZookeeperConnect = null;
        return self();
    }

    public KafkaContainer withExternalZookeeper(String str) {
        this.externalZookeeperConnect = str;
        return self();
    }

    public String getBootstrapServers() {
        if (this.port == PORT_NOT_ASSIGNED) {
            throw new IllegalStateException("You should start Kafka container first");
        }
        return String.format("PLAINTEXT://%s:%s", getContainerIpAddress(), Integer.valueOf(this.port));
    }

    protected void doStart() {
        withCommand("sleep infinity");
        if (this.externalZookeeperConnect == null) {
            addExposedPort(Integer.valueOf(ZOOKEEPER_PORT));
        }
        super.doStart();
    }

    protected void containerIsStarting(InspectContainerResponse inspectContainerResponse) {
        super.containerIsStarting(inspectContainerResponse);
        this.port = getMappedPort(KAFKA_PORT).intValue();
        this.dockerClient.execStartCmd(((ExecCreateCmdResponse) this.dockerClient.execCreateCmd(getContainerId()).withCmd(new String[]{"sh", "-c", "export KAFKA_ZOOKEEPER_CONNECT=" + (this.externalZookeeperConnect != null ? this.externalZookeeperConnect : startZookeeper()) + "\nexport KAFKA_ADVERTISED_LISTENERS=" + getBootstrapServers() + "," + String.format("BROKER://%s:9092", inspectContainerResponse.getNetworkSettings().getIpAddress()) + "\n/etc/confluent/docker/run"}).exec()).getId()).exec(new ExecStartResultCallback()).awaitStarted(10L, TimeUnit.SECONDS);
    }

    private String startZookeeper() {
        try {
            this.dockerClient.execStartCmd(((ExecCreateCmdResponse) this.dockerClient.execCreateCmd(getContainerId()).withCmd(new String[]{"sh", "-c", "printf 'clientPort=2181\ndataDir=/var/lib/zookeeper/data\ndataLogDir=/var/lib/zookeeper/log' > /zookeeper.properties\nzookeeper-server-start /zookeeper.properties\n"}).exec()).getId()).exec(new ExecStartResultCallback()).awaitStarted(10L, TimeUnit.SECONDS);
            return "localhost:2181";
        } catch (InterruptedException e) {
            throw e;
        }
    }
}
