/*
 * Decompiled with CFR 0.152.
 */
package org.eclipse.edc.connector.dataplane.kafka.pipeline;

import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Spliterators;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.eclipse.edc.connector.dataplane.spi.pipeline.DataSource;
import org.eclipse.edc.connector.dataplane.spi.pipeline.StreamResult;
import org.eclipse.edc.spi.monitor.Monitor;
import org.jetbrains.annotations.NotNull;

class KafkaDataSource
implements DataSource {
    private String name;
    private Monitor monitor;
    private Duration pollDuration;
    private Duration maxDuration;
    private Consumer<String, byte[]> consumer;
    private Clock clock;
    private final AtomicBoolean active = new AtomicBoolean(true);

    private KafkaDataSource() {
    }

    public void close() {
        this.active.set(false);
    }

    public StreamResult<Stream<DataSource.Part>> openPartStream() {
        Stream stream = (Stream)this.openRecordsStream().flatMap(consumerRecords -> consumerRecords.partitions().stream().flatMap(p -> consumerRecords.records(p).stream()).map(x$0 -> new KafkaPart((ConsumerRecord<String, byte[]>)x$0)).map(DataSource.Part.class::cast)).onClose(() -> this.consumer.close());
        return StreamResult.success((Object)stream);
    }

    @NotNull
    private Stream<ConsumerRecords<String, byte[]>> openRecordsStream() {
        return StreamSupport.stream(Spliterators.spliteratorUnknownSize(new ConsumerRecordsIterator(), 0), false);
    }

    private class ConsumerRecordsIterator
    implements Iterator<ConsumerRecords<String, byte[]>> {
        private final Instant streamEnd;

        ConsumerRecordsIterator() {
            this.streamEnd = KafkaDataSource.this.maxDuration == null ? Instant.MAX : KafkaDataSource.this.clock.instant().plus(KafkaDataSource.this.maxDuration);
            this.debug("starts consuming events until: " + this.streamEnd);
        }

        @Override
        public boolean hasNext() {
            boolean isMaxDurationReached = KafkaDataSource.this.clock.instant().isAfter(this.streamEnd);
            if (isMaxDurationReached) {
                this.debug("max duration reached");
            }
            return KafkaDataSource.this.active.get() && !isMaxDurationReached;
        }

        @Override
        public ConsumerRecords<String, byte[]> next() {
            ConsumerRecords records;
            do {
                records = KafkaDataSource.this.consumer.poll(KafkaDataSource.this.pollDuration);
            } while (KafkaDataSource.this.active.get() && records.isEmpty());
            return records;
        }

        private void debug(String message) {
            KafkaDataSource.this.monitor.debug(String.format("KafkaDataSource %s %s", KafkaDataSource.this.name, message), new Throwable[0]);
        }
    }

    private class KafkaPart
    implements DataSource.Part {
        private final ConsumerRecord<String, byte[]> consumerRecord;

        private KafkaPart(ConsumerRecord<String, byte[]> consumerRecord) {
            this.consumerRecord = consumerRecord;
        }

        public String name() {
            return KafkaDataSource.this.name;
        }

        public InputStream openStream() {
            return new ByteArrayInputStream((byte[])this.consumerRecord.value());
        }
    }

    public static class Builder {
        private Properties consumerProperties;
        private String topic;
        private final KafkaDataSource dataSource = new KafkaDataSource();

        public static Builder newInstance() {
            return new Builder();
        }

        public Builder name(String name) {
            this.dataSource.name = name;
            return this;
        }

        public Builder monitor(Monitor monitor) {
            this.dataSource.monitor = monitor;
            return this;
        }

        public Builder clock(Clock clock) {
            this.dataSource.clock = clock;
            return this;
        }

        public Builder topic(String topic) {
            this.topic = topic;
            return this;
        }

        public Builder pollDuration(Duration pollDuration) {
            this.dataSource.pollDuration = pollDuration;
            return this;
        }

        public Builder maxDuration(Duration maxDuration) {
            this.dataSource.maxDuration = maxDuration;
            return this;
        }

        public Builder consumerProperties(Properties consumerProperties) {
            this.consumerProperties = consumerProperties;
            return this;
        }

        public KafkaDataSource build() {
            Objects.requireNonNull(this.dataSource.monitor, "monitor");
            Objects.requireNonNull(this.dataSource.pollDuration, "pollDuration");
            Objects.requireNonNull(this.topic, "topic");
            Objects.requireNonNull(this.consumerProperties, "consumerProperties");
            Objects.requireNonNull(this.dataSource.clock, "clock");
            this.dataSource.consumer = new KafkaConsumer(this.consumerProperties);
            this.dataSource.consumer.subscribe(List.of(this.topic));
            return this.dataSource;
        }

        private Builder() {
        }
    }
}

