/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.connector.source.lookup.cache.trigger;

import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.testutils.ScheduledTask;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.trigger.CacheReloadTrigger;
import org.apache.flink.table.connector.source.lookup.cache.trigger.PeriodicCacheReloadTrigger;
import org.apache.flink.table.connector.source.lookup.cache.trigger.ScheduleStrategyExecutorService;
import org.apache.flink.table.connector.source.lookup.cache.trigger.TestTriggerContext;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

class PeriodicCacheReloadTriggerTest {
    private final ScheduleStrategyExecutorService scheduledExecutor = new ScheduleStrategyExecutorService();
    private final TestTriggerContext context = new TestTriggerContext();

    PeriodicCacheReloadTriggerTest() {
    }

    @ParameterizedTest
    @EnumSource(value=PeriodicCacheReloadTrigger.ScheduleMode.class)
    void testNormalReloadInterval(PeriodicCacheReloadTrigger.ScheduleMode scheduleMode) throws Exception {
        Duration reloadInterval = Duration.ofSeconds(10L);
        try (PeriodicCacheReloadTrigger trigger = new PeriodicCacheReloadTrigger(reloadInterval, scheduleMode, (ScheduledExecutorService)((Object)this.scheduledExecutor));){
            trigger.open((CacheReloadTrigger.Context)this.context);
            this.checkExecutorCallByScheduleMode(reloadInterval, scheduleMode);
            this.scheduledExecutor.triggerPeriodicScheduledTasks();
            Assertions.assertThat((int)this.context.getReloadTask().getNumLoads()).isEqualTo(1);
        }
        Assertions.assertThat((boolean)this.scheduledExecutor.isTerminated()).isTrue();
    }

    @Test
    void testNegativeReloadInterval() {
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> new PeriodicCacheReloadTrigger(Duration.ZERO, PeriodicCacheReloadTrigger.ScheduleMode.FIXED_DELAY, (ScheduledExecutorService)((Object)this.scheduledExecutor))).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("must be greater than zero");
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> new PeriodicCacheReloadTrigger(Duration.ofSeconds(-1L), PeriodicCacheReloadTrigger.ScheduleMode.FIXED_DELAY, (ScheduledExecutorService)((Object)this.scheduledExecutor))).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("must be greater than zero");
    }

    @Test
    void testCreateFromConfig() {
        Assertions.assertThat((Object)PeriodicCacheReloadTrigger.fromConfig((ReadableConfig)PeriodicCacheReloadTriggerTest.createValidConf())).isNotNull();
        Configuration conf1 = PeriodicCacheReloadTriggerTest.createValidConf().set(LookupOptions.CACHE_TYPE, (Object)LookupOptions.LookupCacheType.PARTIAL);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> PeriodicCacheReloadTrigger.fromConfig((ReadableConfig)conf1)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("should be 'FULL'");
        Configuration conf2 = PeriodicCacheReloadTriggerTest.createValidConf().set(LookupOptions.FULL_CACHE_RELOAD_STRATEGY, (Object)LookupOptions.ReloadStrategy.TIMED);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> PeriodicCacheReloadTrigger.fromConfig((ReadableConfig)conf2)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("should be 'PERIODIC'");
        Configuration conf3 = PeriodicCacheReloadTriggerTest.createValidConf();
        conf3.removeConfig(LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> PeriodicCacheReloadTrigger.fromConfig((ReadableConfig)conf3)).isInstanceOf(IllegalArgumentException.class)).hasMessageContaining("Missing '" + LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL.key() + "'");
    }

    private void checkExecutorCallByScheduleMode(Duration reloadInterval, PeriodicCacheReloadTrigger.ScheduleMode scheduleMode) {
        switch (scheduleMode) {
            case FIXED_RATE: {
                Assertions.assertThat((int)this.scheduledExecutor.getNumPeriodicTasksWithFixedRate()).isEqualTo(1);
                Assertions.assertThat((int)this.scheduledExecutor.getNumPeriodicTasksWithFixedDelay()).isEqualTo(0);
                break;
            }
            case FIXED_DELAY: {
                Assertions.assertThat((int)this.scheduledExecutor.getNumPeriodicTasksWithFixedRate()).isEqualTo(0);
                Assertions.assertThat((int)this.scheduledExecutor.getNumPeriodicTasksWithFixedDelay()).isEqualTo(1);
                break;
            }
            default: {
                throw new IllegalArgumentException("Unknown schedule mode: " + String.valueOf(scheduleMode));
            }
        }
        Collection tasks = this.scheduledExecutor.getActivePeriodicScheduledTask();
        Assertions.assertThat((int)tasks.size()).isEqualTo(1);
        ScheduledTask task = (ScheduledTask)tasks.iterator().next();
        Assertions.assertThat((long)task.getDelay(TimeUnit.MILLISECONDS)).isEqualTo(0L);
        Assertions.assertThat((long)task.getPeriod()).isEqualTo(reloadInterval.toMillis());
    }

    private static Configuration createValidConf() {
        Configuration configuration = new Configuration();
        configuration.set(LookupOptions.CACHE_TYPE, (Object)LookupOptions.LookupCacheType.FULL);
        configuration.set(LookupOptions.FULL_CACHE_RELOAD_STRATEGY, (Object)LookupOptions.ReloadStrategy.PERIODIC);
        configuration.set(LookupOptions.FULL_CACHE_PERIODIC_RELOAD_INTERVAL, (Object)Duration.ofSeconds(1L));
        return configuration;
    }
}

