/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.statefun.sdk.kafka;

import java.util.List;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.statefun.sdk.IngressType;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.kafka.Constants;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;

public class KafkaIngressSpec<T>
implements IngressSpec<T> {
    private final Properties properties;
    private final List<String> topics;
    private final KafkaIngressDeserializer<T> deserializer;
    private final KafkaIngressStartupPosition startupPosition;
    private final IngressIdentifier<T> ingressIdentifier;

    KafkaIngressSpec(IngressIdentifier<T> id, Properties properties, List<String> topics, KafkaIngressDeserializer<T> deserializer, KafkaIngressStartupPosition startupPosition) {
        this.properties = KafkaIngressSpec.requireValidProperties(properties);
        this.topics = KafkaIngressSpec.requireValidTopics(topics);
        this.startupPosition = KafkaIngressSpec.requireValidStartupPosition(startupPosition, properties);
        this.deserializer = Objects.requireNonNull(deserializer);
        this.ingressIdentifier = Objects.requireNonNull(id);
    }

    public IngressIdentifier<T> id() {
        return this.ingressIdentifier;
    }

    public IngressType type() {
        return Constants.KAFKA_INGRESS_TYPE;
    }

    public Properties properties() {
        return this.properties;
    }

    public List<String> topics() {
        return this.topics;
    }

    public KafkaIngressDeserializer<T> deserializer() {
        return this.deserializer;
    }

    public KafkaIngressStartupPosition startupPosition() {
        return this.startupPosition;
    }

    private static Properties requireValidProperties(Properties properties) {
        Objects.requireNonNull(properties);
        if (!properties.containsKey("bootstrap.servers")) {
            throw new IllegalArgumentException("Missing setting for Kafka address.");
        }
        if (!properties.containsKey("group.id")) {
            throw new IllegalArgumentException("Missing setting for consumer group id.");
        }
        return properties;
    }

    private static List<String> requireValidTopics(List<String> topics) {
        Objects.requireNonNull(topics);
        if (topics.isEmpty()) {
            throw new IllegalArgumentException("Must define at least one Kafka topic to consume from.");
        }
        return topics;
    }

    private static KafkaIngressStartupPosition requireValidStartupPosition(KafkaIngressStartupPosition startupPosition, Properties properties) {
        if (startupPosition.isGroupOffsets() && !properties.containsKey("group.id")) {
            throw new IllegalStateException("The ingress is configured to start from committed consumer group offsets in Kafka, but no consumer group id was set.\nPlease set the group id with the withConsumerGroupId(String) method.");
        }
        return startupPosition;
    }
}

