/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.cloud.sleuth.instrument.kafka;

import java.time.Duration;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.BDDAssertions;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Answers;
import org.mockito.BDDMockito;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.sleuth.Span;
import org.springframework.cloud.sleuth.Tracer;
import org.springframework.cloud.sleuth.exporter.FinishedSpan;
import org.springframework.cloud.sleuth.instrument.kafka.KafkaTestUtils;
import org.springframework.cloud.sleuth.instrument.kafka.TracingKafkaProducer;
import org.springframework.cloud.sleuth.instrument.kafka.TracingKafkaPropagatorSetter;
import org.springframework.cloud.sleuth.propagation.Propagator;
import org.springframework.cloud.sleuth.test.TestSpanHandler;
import org.springframework.cloud.sleuth.test.TestTracingAwareSupplier;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.core.ResolvableType;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
@ExtendWith(value={MockitoExtension.class})
@Tag(value="DockerRequired")
public abstract class KafkaProducerTest
implements TestTracingAwareSupplier {
    protected String testTopic;
    protected Tracer tracer = this.tracerTest().tracing().tracer();
    protected Propagator propagator = this.tracerTest().tracing().propagator();
    protected TestSpanHandler spans = this.tracerTest().handler();
    protected TracingKafkaProducer<String, String> kafkaProducer;
    private final AtomicBoolean consumerRun = new AtomicBoolean();
    protected final BlockingQueue<ConsumerRecord<String, String>> consumerRecords = new LinkedBlockingQueue<ConsumerRecord<String, String>>();
    @Mock(answer=Answers.RETURNS_DEEP_STUBS)
    BeanFactory beanFactory;
    @Container
    protected static final KafkaContainer kafkaContainer = (KafkaContainer)((KafkaContainer)new KafkaContainer(DockerImageName.parse((String)"confluentinc/cp-kafka:6.1.1")).withExposedPorts(new Integer[]{9093})).waitingFor((WaitStrategy)Wait.forListeningPort());

    @BeforeAll
    static void setupAll() {
        kafkaContainer.start();
    }

    @AfterAll
    static void destroyAll() {
        kafkaContainer.stop();
    }

    @BeforeEach
    void setup() {
        BDDMockito.given((Object)this.beanFactory.getBean(Tracer.class)).willReturn((Object)this.tracer);
        BDDMockito.given((Object)this.beanFactory.getBean(Propagator.class)).willReturn((Object)this.propagator);
        BDDMockito.given((Object)this.beanFactory.getBeanProvider(ResolvableType.forClassWithGenerics(Propagator.Setter.class, (ResolvableType[])new ResolvableType[]{ResolvableType.forType((ParameterizedTypeReference)new ParameterizedTypeReference<ProducerRecord<?, ?>>(){})})).getIfAvailable()).willReturn((Object)new TracingKafkaPropagatorSetter());
        this.testTopic = UUID.randomUUID().toString();
        HashMap<String, Object> producerProperties = new HashMap<String, Object>();
        producerProperties.put("bootstrap.servers", kafkaContainer.getBootstrapServers());
        producerProperties.put("key.serializer", StringSerializer.class);
        producerProperties.put("value.serializer", StringSerializer.class);
        this.kafkaProducer = new TracingKafkaProducer((Producer)new KafkaProducer(producerProperties), this.beanFactory);
        this.consumerRun.set(true);
        this.consumerRecords.clear();
    }

    @AfterEach
    void destroy() {
        this.kafkaProducer.close();
        this.consumerRun.set(false);
    }

    @Test
    public void should_create_and_finish_producer_span() {
        AtomicBoolean acknowledged = new AtomicBoolean(false);
        Callback callback = (metadata, ex) -> acknowledged.set(true);
        ProducerRecord producerRecord = new ProducerRecord(this.testTopic, (Object)"test", (Object)"test");
        this.kafkaProducer.send(producerRecord, callback);
        Awaitility.await().atMost(Duration.ofSeconds(15L)).until(acknowledged::get);
        BDDAssertions.then((Object)this.tracer.currentSpan()).isNull();
        BDDAssertions.then((Iterable)this.spans).hasSize(1);
        FinishedSpan span = this.spans.get(0);
        BDDAssertions.then((Comparable)span.getKind()).isEqualTo((Object)Span.Kind.PRODUCER);
        BDDAssertions.then((String)((String)span.getTags().get("kafka.topic"))).isEqualTo(this.testTopic);
    }

    protected void startKafkaConsumer() {
        Executors.newSingleThreadExecutor().execute(this::doStartKafkaConsumer);
    }

    private void doStartKafkaConsumer() {
        KafkaConsumer<String, String> kafkaConsumer = KafkaTestUtils.buildTestKafkaConsumer(kafkaContainer.getBootstrapServers());
        kafkaConsumer.subscribe(Pattern.compile(this.testTopic));
        while (this.consumerRun.get()) {
            ConsumerRecords records = kafkaConsumer.poll(Duration.ofSeconds(1L));
            for (ConsumerRecord record : records) {
                this.consumerRecords.offer((ConsumerRecord<String, String>)record);
            }
        }
        kafkaConsumer.close();
    }

    @Override
    public void cleanUpTracing() {
        this.spans.clear();
    }
}

