/*
 * Decompiled with CFR 0.152.
 */
package org.jlab.jaws.eventsource;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.jlab.jaws.eventsource.EventSourceConfig;
import org.jlab.jaws.eventsource.EventSourceListener;
import org.jlab.jaws.eventsource.EventSourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventSourceTable<K, V>
extends Thread
implements AutoCloseable {
    private final Logger log = LoggerFactory.getLogger(EventSourceTable.class);
    private final KafkaConsumer<K, V> consumer;
    private final EventSourceConfig config;
    private final Set<EventSourceListener<K, V>> listeners = Collections.newSetFromMap(new ConcurrentHashMap());
    private long endOffset = 0L;
    private boolean endReached = false;
    private AtomicReference<CONSUMER_STATE> consumerState = new AtomicReference<CONSUMER_STATE>(CONSUMER_STATE.INITIALIZING);
    private final HashMap<K, EventSourceRecord<K, V>> state = new HashMap();
    private final List<EventSourceRecord<K, V>> changes = new ArrayList<EventSourceRecord<K, V>>();

    public EventSourceTable(Properties props) {
        this.config = new EventSourceConfig(props);
        Properties consumerProps = new Properties();
        consumerProps.putAll((Map<?, ?>)props);
        consumerProps.setProperty("bootstrap.servers", this.config.getString("bootstrap.servers"));
        consumerProps.setProperty("group.id", this.config.getString("group.id"));
        consumerProps.setProperty("key.deserializer", this.config.getString("key.deserializer"));
        consumerProps.setProperty("value.deserializer", this.config.getString("value.deserializer"));
        this.consumer = new KafkaConsumer(consumerProps);
    }

    public void addListener(EventSourceListener<K, V> listener) {
        this.listeners.add(listener);
    }

    public void removeListener(EventSourceListener<K, V> listener) {
        this.listeners.remove(listener);
    }

    @Override
    public void run() {
        boolean transitioned = this.consumerState.compareAndSet(CONSUMER_STATE.INITIALIZING, CONSUMER_STATE.RUNNING);
        if (!transitioned) {
            this.log.debug("We must already be closed!");
        }
        if (transitioned) {
            try {
                this.init();
                this.monitorChanges();
            }
            catch (WakeupException e) {
                if (this.consumerState.get() != CONSUMER_STATE.CLOSED) {
                    throw e;
                }
            }
            finally {
                this.consumer.close();
            }
        }
    }

    private void init() {
        this.log.debug("subscribing to topic: {}", (Object)this.config.getString("event.source.topic"));
        this.consumer.subscribe(Collections.singletonList(this.config.getString("event.source.topic")), new ConsumerRebalanceListener(){

            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                EventSourceTable.this.log.debug("Seeking to beginning of topic");
                if (partitions.size() != 1) {
                    throw new IllegalStateException("We only support single partition Event Sourced topics at this time");
                }
                EventSourceTable.this.consumer.seekToBeginning(partitions);
                Map endOffsets = EventSourceTable.this.consumer.endOffsets(partitions);
                EventSourceTable.this.endOffset = (Long)endOffsets.get(partitions.iterator().next());
                if (EventSourceTable.this.endOffset == 0L) {
                    EventSourceTable.this.log.debug("Empty topic to begin with");
                    EventSourceTable.this.endReached = true;
                }
            }
        });
        int emptyPollCount = 0;
        while (!this.endReached && this.consumerState.get() == CONSUMER_STATE.RUNNING) {
            ConsumerRecords records = this.consumer.poll(Duration.ofMillis(this.config.getLong("event.source.poll.millis")));
            this.log.debug("found " + records.count() + " records");
            for (ConsumerRecord record : records) {
                this.updateState(record);
                this.log.debug("Looking for last index: {}, found: {}", (Object)this.endOffset, (Object)(record.offset() + 1L));
                if (record.offset() + 1L != this.endOffset) continue;
                this.log.debug("end of partition {} reached", (Object)record.partition());
                this.endReached = true;
            }
            if (records.count() == 0) {
                ++emptyPollCount;
            }
            if (emptyPollCount <= 10) continue;
            throw new RuntimeException("Took too long to obtain initial list; verify topic compact policy!");
        }
        this.notifyListenersInitial();
        this.log.debug("Done with EventSourceConsumer init");
    }

    private void monitorChanges() {
        int pollsWithChangesSinceLastFlush = 0;
        boolean hasChanges = false;
        while (this.consumerState.get() == CONSUMER_STATE.RUNNING) {
            this.log.debug("polling for changes ({})", (Object)this.config.getString("event.source.topic"));
            ConsumerRecords records = this.consumer.poll(Duration.ofMillis(this.config.getLong("event.source.poll.millis")));
            if (records.count() > 0) {
                for (ConsumerRecord record : records) {
                    this.updateState(record);
                }
                this.log.debug("Change in topic: request update once settled");
                hasChanges = true;
            } else if (hasChanges) {
                this.log.debug("Flushing changes since we've settled (we had a poll with no changes)");
                this.notifyListenersChanges();
                hasChanges = false;
                pollsWithChangesSinceLastFlush = 0;
            }
            if ((long)pollsWithChangesSinceLastFlush >= this.config.getLong("event.source.max.poll.before.flush")) {
                this.log.debug("Flushing changes due to max poll with changes");
                this.notifyListenersChanges();
                hasChanges = false;
                pollsWithChangesSinceLastFlush = 0;
            }
            if (!hasChanges) continue;
            ++pollsWithChangesSinceLastFlush;
        }
    }

    private void notifyListenersInitial() {
        for (EventSourceListener<K, V> listener : this.listeners) {
            listener.initialState(new HashSet<EventSourceRecord<K, V>>(this.state.values()));
        }
        this.changes.clear();
    }

    private void notifyListenersChanges() {
        for (EventSourceListener<K, V> listener : this.listeners) {
            listener.changes(new ArrayList<EventSourceRecord<K, V>>(this.changes));
        }
        this.changes.clear();
    }

    private void updateState(ConsumerRecord<K, V> record) {
        this.log.debug("State update: {}={}", record.key(), record.value());
        EventSourceRecord<Object, Object> esr = new EventSourceRecord<Object, Object>(record.key(), record.value());
        this.changes.add(esr);
        if (record.value() == null) {
            this.log.debug("Removing record: {}", record.key());
            this.state.remove(record.key());
        } else {
            this.log.debug("Adding record: {}", record.key());
            this.state.put(record.key(), esr);
        }
    }

    @Override
    public void close() {
        CONSUMER_STATE previousState = this.consumerState.getAndSet(CONSUMER_STATE.CLOSED);
        if (previousState == CONSUMER_STATE.INITIALIZING) {
            this.consumer.close();
        } else {
            this.consumer.wakeup();
        }
    }

    private static enum CONSUMER_STATE {
        INITIALIZING,
        RUNNING,
        CLOSED;

    }
}

