/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.edc.connector.dataplane.kafka.pipeline;

import java.time.Clock;
import java.time.Duration;
import java.util.Optional;
import java.util.Properties;
import org.eclipse.edc.connector.dataplane.kafka.config.KafkaPropertiesFactory;
import org.eclipse.edc.connector.dataplane.kafka.pipeline.KafkaDataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSourceFactory;
import org.eclipse.edc.spi.EdcException;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.result.Result;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.transfer.DataFlowStartMessage;
import org.eclipse.edc.validator.dataaddress.kafka.KafkaDataAddressValidator;
import org.eclipse.edc.validator.spi.ValidationResult;
import org.eclipse.edc.validator.spi.Validator;
import org.jetbrains.annotations.NotNull;

public class KafkaDataSourceFactory
implements DataSourceFactory {
    private static final Duration DEFAULT_POLL_DURATION = Duration.ofSeconds(1L);
    private final Monitor monitor;
    private final Validator<DataAddress> validation;
    private final KafkaPropertiesFactory propertiesFactory;
    private final Clock clock;

    public KafkaDataSourceFactory(Monitor monitor, KafkaPropertiesFactory propertiesFactory, Clock clock) {
        this.monitor = monitor;
        this.propertiesFactory = propertiesFactory;
        this.validation = new KafkaDataAddressValidator();
        this.clock = clock;
    }

    public String supportedType() {
        return "Kafka";
    }

    @NotNull
    public Result<Void> validateRequest(DataFlowStartMessage request) {
        DataAddress source = request.getSourceDataAddress();
        return (Result)this.validation.validate((Object)source).flatMap(ValidationResult::toResult);
    }

    public DataSource createSource(DataFlowStartMessage request) {
        Result<Void> validationResult = this.validateRequest(request);
        if (validationResult.failed()) {
            throw new EdcException(validationResult.getFailureDetail());
        }
        DataAddress source = request.getSourceDataAddress();
        String groupId = request.getProcessId() + ":" + request.getId();
        Properties consumerProps = (Properties)this.propertiesFactory.getConsumerProperties(source.getProperties()).orElseThrow(failure -> new IllegalArgumentException(failure.getFailureDetail()));
        consumerProps.put("group.id", groupId);
        String topic = source.getStringProperty("https://w3id.org/edc/v0.0.1/ns/topic");
        String name = source.getStringProperty("https://w3id.org/edc/v0.0.1/ns/name");
        Duration maxDuration = Optional.ofNullable(source.getStringProperty("https://w3id.org/edc/v0.0.1/ns/maxDuration")).map(Duration::parse).orElse(null);
        Duration pollDuration = Optional.ofNullable(source.getStringProperty("https://w3id.org/edc/v0.0.1/ns/pollDuration")).map(Duration::parse).orElse(DEFAULT_POLL_DURATION);
        return KafkaDataSource.Builder.newInstance().monitor(this.monitor).clock(this.clock).topic(topic).name(name).pollDuration(pollDuration).maxDuration(maxDuration).consumerProperties(consumerProps).build();
    }
}

