/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceImpl;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.NeverCompleteFuture;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ProcessingTimeServiceImplTest
extends TestLogger {
    private static final Time testingTimeout = Time.seconds((long)10L);
    private SystemProcessingTimeService timerService;

    @Before
    public void setup() {
        CompletableFuture errorFuture = new CompletableFuture();
        this.timerService = new SystemProcessingTimeService(errorFuture::complete);
    }

    @After
    public void teardown() {
        this.timerService.shutdownService();
    }

    @Test
    public void testTimerRegistrationAndCancellation() throws TimeoutException, InterruptedException, ExecutionException {
        ProcessingTimeServiceImpl processingTimeService = new ProcessingTimeServiceImpl((TimerService)this.timerService, v -> v);
        ScheduledFuture neverFiredTimer = processingTimeService.registerTimer(Long.MAX_VALUE, timestamp -> {});
        Assert.assertEquals((long)1L, (long)this.timerService.getNumTasksScheduled());
        Assert.assertTrue((boolean)neverFiredTimer.cancel(false));
        Assert.assertTrue((boolean)neverFiredTimer.isDone());
        Assert.assertTrue((boolean)neverFiredTimer.isCancelled());
        CompletableFuture firedTimerFuture = new CompletableFuture();
        ScheduledFuture firedTimer = processingTimeService.registerTimer(0L, timestamp -> firedTimerFuture.complete(null));
        firedTimer.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertTrue((boolean)firedTimerFuture.isDone());
        Assert.assertFalse((boolean)firedTimer.isCancelled());
        CompletableFuture periodicTimerFuture = new CompletableFuture();
        ScheduledFuture periodicTimer = processingTimeService.scheduleAtFixedRate(timestamp -> periodicTimerFuture.complete(null), 0L, Long.MAX_VALUE);
        periodicTimerFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertTrue((boolean)periodicTimer.cancel(false));
        Assert.assertTrue((boolean)periodicTimer.isDone());
        Assert.assertTrue((boolean)periodicTimer.isCancelled());
    }

    @Test
    public void testQuiesce() throws Exception {
        ProcessingTimeServiceImpl processingTimeService = new ProcessingTimeServiceImpl((TimerService)this.timerService, v -> v);
        CompletableFuture timerRunFuture = new CompletableFuture();
        OneShotLatch timerWaitLatch = new OneShotLatch();
        ScheduledFuture timer = processingTimeService.registerTimer(0L, timestamp -> {
            timerRunFuture.complete(null);
            timerWaitLatch.await();
        });
        timerRunFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        CompletableFuture quiesceCompletedFuture = processingTimeService.quiesce();
        Assert.assertThat((Object)processingTimeService.registerTimer(0L, timestamp -> {}), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(NeverCompleteFuture.class)));
        Assert.assertThat((Object)processingTimeService.scheduleAtFixedRate(timestamp -> {}, 0L, Long.MAX_VALUE), (Matcher)Matchers.is((Matcher)Matchers.instanceOf(NeverCompleteFuture.class)));
        Assert.assertFalse((boolean)quiesceCompletedFuture.isDone());
        timerWaitLatch.trigger();
        timer.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertTrue((boolean)quiesceCompletedFuture.isDone());
    }

    @Test
    public void testQuiesceWhenNoRunningTimers() {
        ProcessingTimeServiceImpl processingTimeService = new ProcessingTimeServiceImpl((TimerService)this.timerService, v -> v);
        Assert.assertTrue((boolean)processingTimeService.quiesce().isDone());
    }
}

