/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.connector.source.lookup.cache;

import java.io.Serializable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.metrics.groups.CacheMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.table.connector.source.lookup.LookupOptions;
import org.apache.flink.table.connector.source.lookup.cache.DefaultLookupCache;
import org.apache.flink.table.connector.source.lookup.cache.InterceptingCacheMetricGroup;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingRunnable;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class DefaultLookupCacheTest {
    private static final RowData KEY = GenericRowData.of((Object[])new Object[]{"foo", "lookup", "key"});
    private static final RowData NON_EXIST_KEY = GenericRowData.of((Object[])new Object[]{"non-exist"});
    private static final Collection<RowData> VALUE = Arrays.asList(GenericRowData.of((Object[])new Object[]{"bar", "lookup", "value", 0}), GenericRowData.of((Object[])new Object[]{"bar", "lookup", "value", 1}), GenericRowData.of((Object[])new Object[]{"bar", "lookup", "value", 2}), GenericRowData.of((Object[])new Object[]{"bar", "lookup", "value", 3}));

    DefaultLookupCacheTest() {
    }

    @Test
    void testBasicReadWriteInCache() throws Exception {
        try (DefaultLookupCache cache = this.createCache(DefaultLookupCache.newBuilder().maximumSize(Long.MAX_VALUE));){
            cache.put(KEY, VALUE);
            Assertions.assertThat((Collection)cache.getIfPresent(NON_EXIST_KEY)).isNull();
            Assertions.assertThat((Collection)cache.getIfPresent(KEY)).containsExactlyElementsOf(VALUE);
        }
    }

    @Test
    void testExpireAfterAccess() throws Exception {
        Duration expireDuration = Duration.ofSeconds(15213L);
        Duration margin = Duration.ofSeconds(10L);
        ManualClock clock = new ManualClock();
        try (DefaultLookupCache cache = this.createCache(DefaultLookupCache.newBuilder().expireAfterAccess(expireDuration), (Clock)clock);){
            cache.put(KEY, VALUE);
            clock.advanceTime(expireDuration.minus(margin));
            Assertions.assertThat((Collection)cache.getIfPresent(KEY)).containsExactlyElementsOf(VALUE);
            clock.advanceTime(expireDuration.minus(margin));
            Assertions.assertThat((Collection)cache.getIfPresent(KEY)).containsExactlyElementsOf(VALUE);
            clock.advanceTime(expireDuration.plus(margin));
            Assertions.assertThat((Collection)cache.getIfPresent(KEY)).isNull();
        }
    }

    @Test
    void testExpireAfterWrite() throws Exception {
        Duration expireDuration = Duration.ofSeconds(15213L);
        Duration margin = Duration.ofSeconds(10L);
        ManualClock clock = new ManualClock();
        try (DefaultLookupCache cache = this.createCache(DefaultLookupCache.newBuilder().expireAfterWrite(expireDuration), (Clock)clock);){
            cache.put(KEY, VALUE);
            Assertions.assertThat((Collection)cache.getIfPresent(KEY)).containsAll(VALUE);
            clock.advanceTime(expireDuration.plus(margin));
            Assertions.assertThat((Collection)cache.getIfPresent(KEY)).isNull();
        }
    }

    @Test
    void testSizeBasedEviction() throws Exception {
        int cacheSize = 10;
        try (DefaultLookupCache cache = this.createCache(DefaultLookupCache.newBuilder().maximumSize((long)cacheSize));){
            int i;
            for (i = 0; i < cacheSize; ++i) {
                cache.put((RowData)GenericRowData.of((Object[])new Object[]{"lookup", "key", i}), VALUE);
            }
            cache.put((RowData)GenericRowData.of((Object[])new Object[]{"lookup", "key", cacheSize}), VALUE);
            Assertions.assertThat((Collection)cache.getIfPresent((RowData)GenericRowData.of((Object[])new Object[]{"lookup", "key", 0}))).isNull();
            for (i = 1; i < cacheSize + 1; ++i) {
                Assertions.assertThat((Collection)cache.getIfPresent((RowData)GenericRowData.of((Object[])new Object[]{"lookup", "key", i}))).isNotNull();
            }
        }
    }

    @Test
    void testCacheMissingKey() throws Exception {
        try (DefaultLookupCache cache = this.createCache(DefaultLookupCache.newBuilder().maximumSize(Long.MAX_VALUE));){
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> cache.put(null, VALUE)).isInstanceOf(NullPointerException.class)).hasMessage("Cannot put an entry with null key into the cache");
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> cache.put(KEY, null)).isInstanceOf(NullPointerException.class)).hasMessage("Cannot put an entry with null value into the cache");
            cache.put(KEY, Collections.emptyList());
            ((AbstractCollectionAssert)Assertions.assertThat((Collection)cache.getIfPresent(KEY)).isNotNull()).isEmpty();
        }
        cache = this.createCache(DefaultLookupCache.newBuilder().cacheMissingKey(false).maximumSize(Long.MAX_VALUE));
        var2_2 = null;
        try {
            cache.put(KEY, Collections.emptyList());
            Assertions.assertThat((Collection)cache.getIfPresent(KEY)).isNull();
        }
        catch (Throwable throwable) {
            var2_2 = throwable;
            throw throwable;
        }
        finally {
            if (cache != null) {
                if (var2_2 != null) {
                    try {
                        cache.close();
                    }
                    catch (Throwable throwable) {
                        var2_2.addSuppressed(throwable);
                    }
                } else {
                    cache.close();
                }
            }
        }
    }

    @Test
    void testCacheMetrics() throws Exception {
        InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
        try (DefaultLookupCache cache = this.createCache(DefaultLookupCache.newBuilder().maximumSize(Long.MAX_VALUE).maximumSize(Long.MAX_VALUE), null, metricGroup);){
            Assertions.assertThat((Object)metricGroup.hitCounter).isNotNull();
            Assertions.assertThat((long)metricGroup.hitCounter.getCount()).isEqualTo(0L);
            Assertions.assertThat((Object)metricGroup.missCounter).isNotNull();
            Assertions.assertThat((long)metricGroup.missCounter.getCount()).isEqualTo(0L);
            Assertions.assertThat(metricGroup.numCachedRecordsGauge).isNotNull();
            Assertions.assertThat((Long)((Long)metricGroup.numCachedRecordsGauge.getValue())).isEqualTo(0L);
            Assertions.assertThat((Object)metricGroup.loadCounter).isNull();
            Assertions.assertThat((Object)metricGroup.numLoadFailuresCounter).isNull();
            Assertions.assertThat(metricGroup.latestLoadTimeGauge).isNull();
            Assertions.assertThat(metricGroup.numCachedBytesGauge).isNull();
            cache.put(KEY, VALUE);
            Assertions.assertThat((Long)((Long)metricGroup.numCachedRecordsGauge.getValue())).isEqualTo(1L);
            cache.getIfPresent(KEY);
            Assertions.assertThat((long)metricGroup.hitCounter.getCount()).isEqualTo(1L);
            cache.getIfPresent(NON_EXIST_KEY);
            Assertions.assertThat((long)metricGroup.missCounter.getCount()).isEqualTo(1L);
        }
    }

    @Test
    void testCacheSerialization() throws Exception {
        DefaultLookupCache cache = DefaultLookupCache.newBuilder().cacheMissingKey(true).maximumSize(15213L).expireAfterWrite(Duration.ofMillis(18213L)).expireAfterAccess(Duration.ofMillis(15513L)).build();
        DefaultLookupCache cacheCopy = (DefaultLookupCache)CommonTestUtils.createCopySerializable((Serializable)cache);
        Assertions.assertThat((boolean)cacheCopy.isCacheMissingKey()).isEqualTo(true);
        Assertions.assertThat((Long)cacheCopy.getMaximumSize()).isEqualTo(15213L);
        Assertions.assertThat((Duration)cacheCopy.getExpireAfterWriteDuration()).isEqualTo((Object)Duration.ofMillis(18213L));
        Assertions.assertThat((Duration)cacheCopy.getExpireAfterAccessDuration()).isEqualTo((Object)Duration.ofMillis(15513L));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testConcurrentAccess() throws Exception {
        int concurrency = 4;
        ExecutorService executor = Executors.newFixedThreadPool(concurrency * 2);
        InterceptingCacheMetricGroup metricGroup = new InterceptingCacheMetricGroup();
        try (DefaultLookupCache cache = this.createCache(DefaultLookupCache.newBuilder().maximumSize(Long.MAX_VALUE));){
            String value;
            String key;
            int i;
            ArrayList<CompletableFuture<Void>> futures = new ArrayList<CompletableFuture<Void>>();
            for (i = 0; i < concurrency; ++i) {
                key = "key-" + i;
                value = "value-" + i;
                CompletableFuture<Void> future = this.runAsync(() -> {
                    cache.open((CacheMetricGroup)metricGroup);
                    cache.put((RowData)GenericRowData.of((Object[])new Object[]{key}), Collections.singleton(GenericRowData.of((Object[])new Object[]{value})));
                }, executor);
                futures.add(future);
            }
            FutureUtils.waitForAll(futures).get();
            futures.clear();
            for (i = 0; i < concurrency; ++i) {
                key = "key-" + i;
                value = "value-" + i;
                CompletableFuture<Void> hitFuture = this.runAsync(() -> {
                    cache.open((CacheMetricGroup)metricGroup);
                    Assertions.assertThat((Collection)cache.getIfPresent((RowData)GenericRowData.of((Object[])new Object[]{key}))).isEqualTo(Collections.singleton(GenericRowData.of((Object[])new Object[]{value})));
                }, executor);
                futures.add(hitFuture);
                CompletableFuture<Void> missFuture = this.runAsync(() -> {
                    cache.open((CacheMetricGroup)metricGroup);
                    Assertions.assertThat((Collection)cache.getIfPresent(NON_EXIST_KEY)).isNull();
                }, executor);
                futures.add(missFuture);
            }
            FutureUtils.waitForAll(futures).get();
            Assertions.assertThat((long)metricGroup.hitCounter.getCount()).isEqualTo((long)concurrency);
            Assertions.assertThat((long)metricGroup.missCounter.getCount()).isEqualTo((long)concurrency);
            Assertions.assertThat((Long)((Long)metricGroup.numCachedRecordsGauge.getValue())).isEqualTo((long)concurrency);
        }
        finally {
            executor.shutdownNow();
        }
    }

    @Test
    void testBuildFromConfig() {
        Configuration config = new Configuration();
        config.set(LookupOptions.CACHE_TYPE, (Object)LookupOptions.LookupCacheType.PARTIAL);
        config.set(LookupOptions.PARTIAL_CACHE_MAX_ROWS, (Object)15213L);
        config.set(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_WRITE, (Object)Duration.ofMillis(18213L));
        config.set(LookupOptions.PARTIAL_CACHE_EXPIRE_AFTER_ACCESS, (Object)Duration.ofMillis(15513L));
        config.set(LookupOptions.PARTIAL_CACHE_CACHE_MISSING_KEY, (Object)false);
        DefaultLookupCache cache = DefaultLookupCache.fromConfig((ReadableConfig)config);
        Assertions.assertThat((Long)cache.getMaximumSize()).isEqualTo(15213L);
        Assertions.assertThat((Duration)cache.getExpireAfterWriteDuration()).isEqualTo((Object)Duration.ofMillis(18213L));
        Assertions.assertThat((Duration)cache.getExpireAfterAccessDuration()).isEqualTo((Object)Duration.ofMillis(15513L));
        Assertions.assertThat((boolean)cache.isCacheMissingKey()).isFalse();
        Configuration configWithIllegalCacheType = new Configuration();
        configWithIllegalCacheType.set(LookupOptions.CACHE_TYPE, (Object)LookupOptions.LookupCacheType.NONE);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> DefaultLookupCache.fromConfig((ReadableConfig)configWithIllegalCacheType)).isInstanceOf(IllegalArgumentException.class)).hasMessage("'lookup.cache' should be 'PARTIAL' in order to build a default lookup cache");
        Configuration configWithoutEviction = new Configuration();
        configWithoutEviction.set(LookupOptions.CACHE_TYPE, (Object)LookupOptions.LookupCacheType.PARTIAL);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> DefaultLookupCache.fromConfig((ReadableConfig)configWithoutEviction)).isInstanceOf(IllegalArgumentException.class)).hasMessage("Missing 'lookup.partial-cache.expire-after-access', 'lookup.partial-cache.expire-after-write' or 'lookup.partial-cache.max-rows' in the configuration. The cache will not have evictions under this configuration and could lead to potential memory issues as the cache size may grow indefinitely.");
    }

    @Test
    void testBuilder() {
        DefaultLookupCache cache = DefaultLookupCache.newBuilder().cacheMissingKey(true).maximumSize(15213L).expireAfterWrite(Duration.ofMillis(18213L)).expireAfterAccess(Duration.ofMillis(15513L)).build();
        Assertions.assertThat((boolean)cache.isCacheMissingKey()).isEqualTo(true);
        Assertions.assertThat((Long)cache.getMaximumSize()).isEqualTo(15213L);
        Assertions.assertThat((Duration)cache.getExpireAfterWriteDuration()).isEqualTo((Object)Duration.ofMillis(18213L));
        Assertions.assertThat((Duration)cache.getExpireAfterAccessDuration()).isEqualTo((Object)Duration.ofMillis(15513L));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> DefaultLookupCache.newBuilder().build()).isInstanceOf(IllegalArgumentException.class)).hasMessage("Expiration duration and maximum size are not set for the cache. The cache will not have any eviction and could lead to potential memory issues as the cache size may grow infinitely.");
    }

    private DefaultLookupCache createCache(DefaultLookupCache.Builder builder) throws Exception {
        return this.createCache(builder, null, null);
    }

    private DefaultLookupCache createCache(DefaultLookupCache.Builder builder, Clock clock) throws Exception {
        return this.createCache(builder, clock, null);
    }

    private DefaultLookupCache createCache(DefaultLookupCache.Builder builder, Clock clock, CacheMetricGroup metricGroup) throws Exception {
        DefaultLookupCache copiedCache = (DefaultLookupCache)CommonTestUtils.createCopySerializable((Serializable)builder.build());
        if (clock != null) {
            copiedCache.withClock(clock);
        }
        if (metricGroup == null) {
            copiedCache.open(UnregisteredMetricsGroup.createCacheMetricGroup());
        } else {
            copiedCache.open(metricGroup);
        }
        return copiedCache;
    }

    private CompletableFuture<Void> runAsync(Runnable runnable, ExecutorService executor) {
        return CompletableFuture.runAsync(ThrowingRunnable.unchecked(() -> {
            Thread.sleep(ThreadLocalRandom.current().nextLong(0L, 10L));
            runnable.run();
        }), executor);
    }
}

