/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.statefun.sdk.kafka;

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import org.apache.flink.statefun.sdk.annotations.ForRuntime;
import org.apache.flink.statefun.sdk.core.OptionalProperty;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressAutoResetPosition;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressSpec;
import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;

public final class KafkaIngressBuilder<T> {
    private final IngressIdentifier<T> id;
    private final List<String> topics = new ArrayList<String>();
    private final Properties properties = new Properties();
    private OptionalProperty<String> consumerGroupId = OptionalProperty.withoutDefault();
    private OptionalProperty<KafkaIngressDeserializer<T>> deserializer = OptionalProperty.withoutDefault();
    private OptionalProperty<String> kafkaAddress = OptionalProperty.withoutDefault();
    private OptionalProperty<KafkaIngressAutoResetPosition> autoResetPosition = OptionalProperty.withDefault((Object)((Object)KafkaIngressAutoResetPosition.LATEST));
    private OptionalProperty<KafkaIngressStartupPosition> startupPosition = OptionalProperty.withDefault((Object)KafkaIngressStartupPosition.fromLatest());

    private KafkaIngressBuilder(IngressIdentifier<T> id) {
        this.id = Objects.requireNonNull(id);
    }

    public static <T> KafkaIngressBuilder<T> forIdentifier(IngressIdentifier<T> id) {
        return new KafkaIngressBuilder<T>(id);
    }

    public KafkaIngressBuilder<T> withConsumerGroupId(String consumerGroupId) {
        this.consumerGroupId.set((Object)consumerGroupId);
        return this;
    }

    public KafkaIngressBuilder<T> withKafkaAddress(String kafkaAddress) {
        this.kafkaAddress.set((Object)kafkaAddress);
        return this;
    }

    public KafkaIngressBuilder<T> withTopic(String topic) {
        this.topics.add(topic);
        return this;
    }

    public KafkaIngressBuilder<T> addTopics(List<String> topics) {
        this.topics.addAll(topics);
        return this;
    }

    public KafkaIngressBuilder<T> withProperties(Properties properties) {
        this.properties.putAll((Map<?, ?>)properties);
        return this;
    }

    public KafkaIngressBuilder<T> withProperty(String name, String value) {
        Objects.requireNonNull(name);
        Objects.requireNonNull(value);
        this.properties.setProperty(name, value);
        return this;
    }

    public KafkaIngressBuilder<T> withDeserializer(Class<? extends KafkaIngressDeserializer<T>> deserializerClass) {
        Objects.requireNonNull(deserializerClass);
        this.deserializer.set(KafkaIngressBuilder.instantiateDeserializer(deserializerClass));
        return this;
    }

    public KafkaIngressBuilder<T> withAutoResetPosition(KafkaIngressAutoResetPosition autoResetPosition) {
        this.autoResetPosition.set((Object)autoResetPosition);
        return this;
    }

    public KafkaIngressBuilder<T> withStartupPosition(KafkaIngressStartupPosition startupPosition) {
        this.startupPosition.set((Object)startupPosition);
        return this;
    }

    public KafkaIngressSpec<T> build() {
        Properties properties = this.resolveKafkaProperties();
        return new KafkaIngressSpec<T>(this.id, properties, this.topics, (KafkaIngressDeserializer)this.deserializer.get(), (KafkaIngressStartupPosition)this.startupPosition.get());
    }

    private Properties resolveKafkaProperties() {
        Properties resultProps = new Properties();
        resultProps.putAll((Map<?, ?>)this.properties);
        this.kafkaAddress.overwritePropertiesIfPresent(resultProps, "bootstrap.servers");
        this.autoResetPosition.overwritePropertiesIfPresent(resultProps, "auto.offset.reset");
        this.consumerGroupId.overwritePropertiesIfPresent(resultProps, "group.id");
        return resultProps;
    }

    private static <T extends KafkaIngressDeserializer<?>> T instantiateDeserializer(Class<T> deserializerClass) {
        try {
            Constructor<T> defaultConstructor = deserializerClass.getDeclaredConstructor(new Class[0]);
            defaultConstructor.setAccessible(true);
            return (T)((KafkaIngressDeserializer)defaultConstructor.newInstance(new Object[0]));
        }
        catch (NoSuchMethodException e) {
            throw new IllegalStateException("Unable to create an instance of deserializer " + deserializerClass.getName() + "; has no default constructor", e);
        }
        catch (IllegalAccessException | InstantiationException | InvocationTargetException e) {
            throw new IllegalStateException("Unable to create an instance of deserializer " + deserializerClass.getName(), e);
        }
    }

    @ForRuntime
    KafkaIngressBuilder<T> withDeserializer(KafkaIngressDeserializer<T> deserializer) {
        this.deserializer.set(deserializer);
        return this;
    }
}

