/*
 * Decompiled with CFR 0.152.
 */
package edu.iu.dsc.tws.task.window.event;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import edu.iu.dsc.tws.task.window.api.GlobalStreamId;
import edu.iu.dsc.tws.task.window.event.WatermarkEvent;
import edu.iu.dsc.tws.task.window.exceptions.FailedException;
import edu.iu.dsc.tws.task.window.manage.WindowManager;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

public class WatermarkEventGenerator<T>
implements Runnable {
    private static final Logger LOG = Logger.getLogger(WatermarkEventGenerator.class.getName());
    private final WindowManager<T> windowManager;
    private final long eventLagTime;
    private final ScheduledExecutorService executorService;
    private final long interval;
    private ScheduledFuture<?> executorFuture;
    private volatile long lastWatermarkTime;
    private volatile long currentProcessedMessageTime = 0L;
    private Set<GlobalStreamId> inputStreams;
    private final Map<GlobalStreamId, Long> streamTimeStampMap;

    public WatermarkEventGenerator(WindowManager<T> winManager, long eventLagTime, long interval, Set<GlobalStreamId> inputStreams) {
        this.windowManager = winManager;
        this.eventLagTime = eventLagTime;
        this.interval = interval;
        this.inputStreams = inputStreams;
        this.streamTimeStampMap = new ConcurrentHashMap<GlobalStreamId, Long>();
        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("tws-watermark-event-generator-%d").setDaemon(true).build();
        this.executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
    }

    @Deprecated
    private long computeWaterMark() {
        long t = 0L;
        return t - this.eventLagTime;
    }

    private long computeWaterMarkTimeStamp() {
        long timestamp = 0L;
        if (this.streamTimeStampMap.size() >= this.inputStreams.size()) {
            timestamp = Long.MAX_VALUE;
            for (Map.Entry<GlobalStreamId, Long> entry : this.streamTimeStampMap.entrySet()) {
                timestamp = Math.min(timestamp, entry.getValue());
            }
        }
        return timestamp - this.eventLagTime;
    }

    @Override
    public void run() {
        try {
            long watermarkTime = this.computeWaterMarkTimeStamp();
            if (watermarkTime > this.lastWatermarkTime) {
                this.windowManager.add(new WatermarkEvent(watermarkTime));
                this.lastWatermarkTime = watermarkTime;
                LOG.fine(String.format("WaterMark event added", new Object[0]));
            }
        }
        catch (Throwable throwable) {
            LOG.severe(String.format("Failure occurred in the watermarked event %s ", throwable.getMessage()));
        }
    }

    public boolean track(long time) {
        Long currentValue = this.currentProcessedMessageTime;
        if (currentValue == 0L || time > currentValue) {
            this.currentProcessedMessageTime = time;
        }
        this.checkFailures();
        return time >= this.lastWatermarkTime;
    }

    public boolean track(GlobalStreamId globalStreamId, long timestamp) {
        Long currentValue = this.streamTimeStampMap.get(globalStreamId);
        if (currentValue == null || timestamp > currentValue) {
            this.streamTimeStampMap.put(globalStreamId, timestamp);
        }
        this.checkFailures();
        return timestamp >= this.lastWatermarkTime;
    }

    private void checkFailures() {
        if (this.executorFuture != null && this.executorFuture.isDone()) {
            try {
                this.executorFuture.get();
            }
            catch (InterruptedException ex) {
                LOG.severe(String.format("Exception Occurred : %s", ex.getMessage()));
                throw new FailedException(ex);
            }
            catch (ExecutionException ex) {
                LOG.severe(String.format("Exception Occurred : %s", ex.getMessage()));
                throw new FailedException(ex);
            }
        }
    }

    public void start() {
        this.executorFuture = this.executorService.scheduleAtFixedRate(this, this.interval, this.interval, TimeUnit.MILLISECONDS);
    }

    public void shutdown() {
        LOG.info("Shutting Down WatermarkGenerator");
        this.executorService.shutdown();
        try {
            if (!this.executorService.awaitTermination(2L, TimeUnit.SECONDS)) {
                this.executorService.shutdownNow();
            }
        }
        catch (InterruptedException ie) {
            this.executorService.shutdownNow();
            Thread.currentThread().interrupt();
        }
    }
}

