/*
 * Decompiled with CFR 0.152.
 */
package org.citrusframework.kafka.endpoint;

import java.time.Duration;
import java.util.HashMap;
import java.util.Optional;
import java.util.UUID;
import org.citrusframework.context.TestContext;
import org.citrusframework.endpoint.EndpointConfiguration;
import org.citrusframework.kafka.endpoint.KafkaEndpointConfiguration;
import org.citrusframework.kafka.endpoint.KafkaMessageFilteringConsumer;
import org.citrusframework.kafka.endpoint.KafkaMessageSingleConsumer;
import org.citrusframework.message.Message;
import org.citrusframework.messaging.AbstractSelectiveMessageConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaConsumer
extends AbstractSelectiveMessageConsumer {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer = this.createConsumer();

    public KafkaConsumer(String name, KafkaEndpointConfiguration endpointConfiguration) {
        super(name, (EndpointConfiguration)endpointConfiguration);
    }

    public org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> getConsumer() {
        return this.consumer;
    }

    public void setConsumer(org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> consumer) {
        this.consumer = consumer;
    }

    public Message receive(TestContext testContext, long timeout) {
        logger.debug("Receiving single message");
        return KafkaMessageSingleConsumer.builder().consumer(this.consumer).endpointConfiguration(this.getEndpointConfiguration()).build().receive(testContext, timeout);
    }

    public Message receive(String selector, TestContext testContext, long timeout) {
        logger.debug("Receiving selected message: {}", (Object)selector);
        return KafkaMessageFilteringConsumer.builder().consumer(this.consumer).endpointConfiguration(this.getEndpointConfiguration()).build().receive(selector, testContext, timeout);
    }

    protected KafkaEndpointConfiguration getEndpointConfiguration() {
        return (KafkaEndpointConfiguration)super.getEndpointConfiguration();
    }

    public void stop() {
        try {
            if (this.consumer.subscription() != null && !this.consumer.subscription().isEmpty()) {
                this.consumer.unsubscribe();
            }
        }
        finally {
            this.consumer.close(Duration.ofSeconds(10L));
        }
    }

    private org.apache.kafka.clients.consumer.KafkaConsumer<Object, Object> createConsumer() {
        HashMap<String, Object> consumerProps = new HashMap<String, Object>();
        consumerProps.put("client.id", Optional.ofNullable(this.getEndpointConfiguration().getClientId()).orElseGet(() -> "citrus_kafka_consumer_" + UUID.randomUUID()));
        consumerProps.put("group.id", this.getEndpointConfiguration().getConsumerGroup());
        consumerProps.put("bootstrap.servers", Optional.ofNullable(this.getEndpointConfiguration().getServer()).orElse("localhost:9092"));
        consumerProps.put("max.poll.records", 1);
        consumerProps.put("enable.auto.commit", this.getEndpointConfiguration().isAutoCommit());
        consumerProps.put("auto.commit.interval.ms", this.getEndpointConfiguration().getAutoCommitInterval());
        consumerProps.put("auto.offset.reset", this.getEndpointConfiguration().getOffsetReset());
        consumerProps.put("key.deserializer", this.getEndpointConfiguration().getKeyDeserializer());
        consumerProps.put("value.deserializer", this.getEndpointConfiguration().getValueDeserializer());
        consumerProps.putAll(this.getEndpointConfiguration().getConsumerProperties());
        return new org.apache.kafka.clients.consumer.KafkaConsumer(consumerProps);
    }
}

