/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeServiceImpl;
import org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerService;
import org.apache.flink.util.concurrent.NeverCompleteFuture;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.FutureAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ProcessingTimeServiceImplTest {
    private static final Duration TESTING_TIMEOUT = Duration.ofSeconds(10L);
    private SystemProcessingTimeService timerService;

    ProcessingTimeServiceImplTest() {
    }

    @BeforeEach
    void setup() {
        CompletableFuture errorFuture = new CompletableFuture();
        this.timerService = new SystemProcessingTimeService(errorFuture::complete);
    }

    @AfterEach
    void teardown() {
        this.timerService.shutdownService();
    }

    @Test
    void testTimerRegistrationAndCancellation() throws TimeoutException, InterruptedException, ExecutionException {
        ProcessingTimeServiceImpl processingTimeService = new ProcessingTimeServiceImpl((TimerService)this.timerService, v -> v);
        ScheduledFuture neverFiredTimer = processingTimeService.registerTimer(Long.MAX_VALUE, timestamp -> {});
        Assertions.assertThat((int)this.timerService.getNumTasksScheduled()).isOne();
        Assertions.assertThat((boolean)neverFiredTimer.cancel(false)).isTrue();
        ((FutureAssert)Assertions.assertThat((Future)neverFiredTimer).isDone()).isCancelled();
        CompletableFuture firedTimerFuture = new CompletableFuture();
        ScheduledFuture firedTimer = processingTimeService.registerTimer(0L, timestamp -> firedTimerFuture.complete(null));
        firedTimer.get(TESTING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        Assertions.assertThat(firedTimerFuture).isDone();
        Assertions.assertThat((Future)firedTimer).isNotCancelled();
        CompletableFuture periodicTimerFuture = new CompletableFuture();
        ScheduledFuture periodicTimer = processingTimeService.scheduleAtFixedRate(timestamp -> periodicTimerFuture.complete(null), 0L, Long.MAX_VALUE);
        periodicTimerFuture.get(TESTING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        Assertions.assertThat((boolean)periodicTimer.cancel(false)).isTrue();
        ((FutureAssert)Assertions.assertThat((Future)periodicTimer).isDone()).isCancelled();
    }

    @Test
    void testQuiesce() throws Exception {
        ProcessingTimeServiceImpl processingTimeService = new ProcessingTimeServiceImpl((TimerService)this.timerService, v -> v);
        CompletableFuture timerRunFuture = new CompletableFuture();
        OneShotLatch timerWaitLatch = new OneShotLatch();
        ScheduledFuture timer = processingTimeService.registerTimer(0L, timestamp -> {
            timerRunFuture.complete(null);
            timerWaitLatch.await();
        });
        timerRunFuture.get(TESTING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        CompletableFuture quiesceCompletedFuture = processingTimeService.quiesce();
        Assertions.assertThat((Future)processingTimeService.registerTimer(0L, timestamp -> {})).isInstanceOf(NeverCompleteFuture.class);
        Assertions.assertThat((Future)processingTimeService.scheduleAtFixedRate(timestamp -> {}, 0L, Long.MAX_VALUE)).isInstanceOf(NeverCompleteFuture.class);
        Assertions.assertThat((CompletableFuture)quiesceCompletedFuture).isNotDone();
        timerWaitLatch.trigger();
        timer.get(TESTING_TIMEOUT.toMillis(), TimeUnit.MILLISECONDS);
        Assertions.assertThat((CompletableFuture)quiesceCompletedFuture).isDone();
    }

    @Test
    void testQuiesceWhenNoRunningTimers() {
        ProcessingTimeServiceImpl processingTimeService = new ProcessingTimeServiceImpl((TimerService)this.timerService, v -> v);
        Assertions.assertThat((CompletableFuture)processingTimeService.quiesce()).isDone();
    }
}

