/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor.slot;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.Preconditions;

public class DefaultTimerService<K>
implements TimerService<K> {
    private final ScheduledExecutorService scheduledExecutorService;
    private final long shutdownTimeout;
    private final Map<K, Timeout<K>> timeouts;
    private TimeoutListener<K> timeoutListener;

    public DefaultTimerService(ScheduledExecutorService scheduledExecutorService, long shutdownTimeout) {
        this.scheduledExecutorService = Preconditions.checkNotNull(scheduledExecutorService);
        Preconditions.checkArgument(shutdownTimeout >= 0L, "The shut down timeout must be larger than or equal than 0.");
        this.shutdownTimeout = shutdownTimeout;
        this.timeouts = CollectionUtil.newHashMapWithExpectedSize(16);
        this.timeoutListener = null;
    }

    @Override
    public void start(TimeoutListener<K> initialTimeoutListener) {
        Preconditions.checkState(!this.scheduledExecutorService.isShutdown());
        Preconditions.checkState(this.timeoutListener == null);
        this.timeoutListener = Preconditions.checkNotNull(initialTimeoutListener);
    }

    @Override
    public void stop() {
        this.unregisterAllTimeouts();
        this.timeoutListener = null;
        ExecutorUtils.gracefulShutdown(this.shutdownTimeout, TimeUnit.MILLISECONDS, this.scheduledExecutorService);
    }

    @Override
    public void registerTimeout(K key, long delay, TimeUnit unit) {
        Preconditions.checkState(this.timeoutListener != null, "The " + this.getClass().getSimpleName() + " has not been started.");
        if (this.timeouts.containsKey(key)) {
            this.unregisterTimeout(key);
        }
        this.timeouts.put(key, new Timeout<K>(this.timeoutListener, key, delay, unit, this.scheduledExecutorService));
    }

    @Override
    public void unregisterTimeout(K key) {
        Timeout<K> timeout = this.timeouts.remove(key);
        if (timeout != null) {
            timeout.cancel();
        }
    }

    protected void unregisterAllTimeouts() {
        for (Timeout<K> timeout : this.timeouts.values()) {
            timeout.cancel();
        }
        this.timeouts.clear();
    }

    @Override
    public boolean isValid(K key, UUID ticket) {
        if (this.timeouts.containsKey(key)) {
            Timeout<K> timeout = this.timeouts.get(key);
            return timeout.getTicket().equals(ticket);
        }
        return false;
    }

    @VisibleForTesting
    Map<K, Timeout<K>> getTimeouts() {
        return this.timeouts;
    }

    @VisibleForTesting
    static final class Timeout<K>
    implements Runnable {
        private final TimeoutListener<K> timeoutListener;
        private final K key;
        private final ScheduledFuture<?> scheduledTimeout;
        private final UUID ticket;

        Timeout(TimeoutListener<K> timeoutListener, K key, long delay, TimeUnit unit, ScheduledExecutorService scheduledExecutorService) {
            Preconditions.checkNotNull(scheduledExecutorService);
            this.timeoutListener = Preconditions.checkNotNull(timeoutListener);
            this.key = Preconditions.checkNotNull(key);
            this.scheduledTimeout = scheduledExecutorService.schedule(this, delay, unit);
            this.ticket = UUID.randomUUID();
        }

        UUID getTicket() {
            return this.ticket;
        }

        void cancel() {
            this.scheduledTimeout.cancel(true);
        }

        @Override
        public void run() {
            this.timeoutListener.notifyTimeout(this.key, this.ticket);
        }
    }
}

