/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.task.window.manage;

import edu.iu.dsc.tws.api.compute.IMessage;
import edu.iu.dsc.tws.task.window.api.Event;
import edu.iu.dsc.tws.task.window.api.EventImpl;
import edu.iu.dsc.tws.task.window.api.IEvictionPolicy;
import edu.iu.dsc.tws.task.window.api.IWindowMessage;
import edu.iu.dsc.tws.task.window.api.WindowLifeCycleListener;
import edu.iu.dsc.tws.task.window.api.WindowMessageImpl;
import edu.iu.dsc.tws.task.window.constant.Action;
import edu.iu.dsc.tws.task.window.manage.IManager;
import edu.iu.dsc.tws.task.window.policy.trigger.IWindowingPolicy;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

public class WindowManager<T>
implements IManager<T> {
    private static final Logger LOG = Logger.getLogger(WindowManager.class.getName());
    public static final int EXPIRE_EVENTS_THRESHOLD = 20;
    private static final long serialVersionUID = -15452808832480739L;
    private IWindowingPolicy<T> windowingPolicy;
    private IEvictionPolicy<T> evictionPolicy;
    private WindowLifeCycleListener<T> windowLifeCycleListener;
    private List<IMessage<T>> expiredEvents;
    private ReentrantLock lock;
    private final ConcurrentLinkedQueue<Event<T>> queue;
    private final Set<Event<T>> previousWindowEvents;
    private final AtomicInteger eventsSinceLastExpiration;
    private boolean debug = false;

    public WindowManager(WindowLifeCycleListener<T> windowLifeCycleListener) {
        this.windowLifeCycleListener = windowLifeCycleListener;
        this.queue = new ConcurrentLinkedQueue();
        this.expiredEvents = new ArrayList<IMessage<T>>();
        this.lock = new ReentrantLock();
        this.previousWindowEvents = new HashSet<Event<T>>();
        this.eventsSinceLastExpiration = new AtomicInteger();
    }

    public WindowManager() {
        this.queue = new ConcurrentLinkedQueue();
        this.expiredEvents = new ArrayList<IMessage<T>>();
        this.lock = new ReentrantLock();
        this.previousWindowEvents = new HashSet<Event<T>>();
        this.eventsSinceLastExpiration = new AtomicInteger();
    }

    public IWindowingPolicy<T> getWindowingPolicy() {
        return this.windowingPolicy;
    }

    public void setWindowingPolicy(IWindowingPolicy<T> windowingPolicy) {
        this.windowingPolicy = windowingPolicy;
    }

    public IEvictionPolicy<T> getEvictionPolicy() {
        return this.evictionPolicy;
    }

    public void setEvictionPolicy(IEvictionPolicy<T> evictionPolicy) {
        this.evictionPolicy = evictionPolicy;
    }

    @Override
    public void add(IMessage<T> message) {
        this.add(message, System.currentTimeMillis());
    }

    public void add(IMessage<T> message, long ts) {
        this.add(new EventImpl<T>(message, ts));
    }

    @Override
    public void add(Event<T> windowEvent) {
        if (!windowEvent.isWatermark()) {
            this.queue.add(windowEvent);
        } else {
            LOG.fine(String.format("Event With WaterMark ts %f ", windowEvent.getTimeStamp()));
        }
        this.track(windowEvent);
        this.compactWindow();
    }

    @Override
    public boolean onEvent() {
        List<Event<T>> windowEvents = null;
        ArrayList<IMessage<T>> expired = null;
        try {
            this.lock.lock();
            windowEvents = this.scanEvents(true);
            expired = new ArrayList<IMessage<T>>(this.expiredEvents);
            this.expiredEvents.clear();
        }
        finally {
            this.lock.unlock();
        }
        ArrayList<IMessage<T>> events = new ArrayList<IMessage<T>>();
        ArrayList<IMessage<T>> newEvents = new ArrayList<IMessage<T>>();
        for (Event<T> event : windowEvents) {
            events.add(event.get());
            if (this.previousWindowEvents.contains(event)) continue;
            newEvents.add(event.get());
        }
        this.previousWindowEvents.clear();
        if (!events.isEmpty()) {
            this.previousWindowEvents.addAll(windowEvents);
            LOG.log(Level.FINE, String.format("WindowLifeCycleListener onActivation, events in the window : %d", events.size()));
            IWindowMessage<T> ievents = this.bundleNonExpiredWindowIMessage(events);
            IWindowMessage<T> inewEvents = this.bundleNonExpiredWindowIMessage(newEvents);
            IWindowMessage<T> iexpired = this.bundleExpiredWindowIMessage(expired);
            this.windowLifeCycleListener.onActivation(ievents, inewEvents, iexpired);
        } else {
            LOG.log(Level.FINE, String.format("No events processed for the window, onActivation method is not called", new Object[0]));
        }
        this.windowingPolicy.reset();
        return !events.isEmpty();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public List<Event<T>> scanEvents(boolean fullScan) {
        ArrayList<IMessage<T>> eventsToExpire = new ArrayList<IMessage<T>>();
        ArrayList<Event<T>> eventsToProcess = new ArrayList<Event<T>>();
        try {
            this.lock.lock();
            Iterator<Event<T>> it = this.queue.iterator();
            while (it.hasNext()) {
                Event<T> windowEvent = it.next();
                Action action = this.evictionPolicy.evict(windowEvent);
                if (action == Action.EXPIRE) {
                    eventsToExpire.add(windowEvent.get());
                    it.remove();
                    continue;
                }
                if (!fullScan || action == Action.STOP) break;
                if (action != Action.PROCESS) continue;
                eventsToProcess.add(windowEvent);
            }
            this.expiredEvents.addAll(eventsToExpire);
        }
        finally {
            this.lock.unlock();
        }
        this.eventsSinceLastExpiration.set(0);
        if (!eventsToExpire.isEmpty()) {
            if (this.debug) {
                LOG.severe(String.format("OnExpiry called on WindowLifeCycleListener", new Object[0]));
            }
            IWindowMessage<T> eventsToExpireIWindow = this.bundleExpiredWindowIMessage(eventsToExpire);
            this.windowLifeCycleListener.onExpiry(eventsToExpireIWindow);
        }
        return eventsToProcess;
    }

    public IWindowMessage<T> bundleWindowMessage(List<Event<T>> events) {
        WindowMessageImpl winMessage = null;
        ArrayList messages = new ArrayList();
        for (Event<T> event : events) {
            IMessage<T> m = event.get();
            messages.add(m);
        }
        winMessage = new WindowMessageImpl(messages);
        return winMessage;
    }

    public IWindowMessage<T> bundleNonExpiredWindowIMessage(List<IMessage<T>> events) {
        WindowMessageImpl winMessage = null;
        ArrayList messages = new ArrayList();
        for (IMessage<T> m : events) {
            messages.add(m);
        }
        winMessage = new WindowMessageImpl(messages);
        return winMessage;
    }

    public IWindowMessage<T> bundleExpiredWindowIMessage(List<IMessage<T>> events) {
        WindowMessageImpl winMessage = null;
        ArrayList messages = new ArrayList();
        for (IMessage<T> m : events) {
            messages.add(m);
        }
        winMessage = new WindowMessageImpl(null, messages);
        return winMessage;
    }

    public void track(Event<T> windowEvent) {
        this.evictionPolicy.track(windowEvent);
        this.windowingPolicy.track(windowEvent);
    }

    public void compactWindow() {
        if (this.eventsSinceLastExpiration.incrementAndGet() >= 20) {
            this.scanEvents(false);
        }
    }

    public void shutdown() {
        if (this.windowingPolicy != null) {
            this.windowingPolicy.shutdown();
        }
    }

    public List<Long> getSlidingCountTimestamps(long start, long end, long slide) {
        ArrayList<Long> timestamps = new ArrayList<Long>();
        if (end > start) {
            int count = 0;
            long ts = Long.MIN_VALUE;
            for (Event<T> event : this.queue) {
                if (event.getTimeStamp() <= start || event.getTimeStamp() > end) continue;
                ts = Math.max(ts, event.getTimeStamp());
                if ((long)(++count) % slide != 0L) continue;
                timestamps.add(ts);
            }
        }
        return timestamps;
    }

    public long getEventCount(long referenceTime) {
        long eventCount = 0L;
        for (Event<T> event : this.queue) {
            if (event.getTimeStamp() > referenceTime) continue;
            ++eventCount;
        }
        return eventCount;
    }

    public long getEarliestEventTimestamp(long start, long end) {
        long minTimestamp = Long.MAX_VALUE;
        for (Event<T> event : this.queue) {
            if (event.getTimeStamp() <= start || event.getTimeStamp() > end) continue;
            minTimestamp = Math.min(minTimestamp, event.getTimeStamp());
        }
        return minTimestamp;
    }
}

