/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;

import java.io.File;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.BiConsumer;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile;
import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.RemoteStorageScanner;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class RemoteStorageScannerTest {
    private static final TieredStoragePartitionId DEFAULT_PARTITION_ID = TieredStorageIdMappingUtils.convertId((ResultPartitionID)new ResultPartitionID());
    private static final TieredStorageSubpartitionId DEFAULT_SUBPARTITION_ID = new TieredStorageSubpartitionId(0);
    @TempDir
    private File tempFolder;
    private String remoteStoragePath;

    RemoteStorageScannerTest() {
    }

    @BeforeEach
    void before() {
        this.remoteStoragePath = Path.fromLocalFile((File)this.tempFolder).getPath();
    }

    @Test
    void testWatchSegment() throws IOException {
        CompletableFuture future = new CompletableFuture();
        BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId> notifier = (partitionId, subpartitionId) -> future.complete(null);
        this.createSegmentFile(2);
        this.createSegmentFinishFile(2);
        RemoteStorageScanner remoteStorageScanner = new RemoteStorageScanner(this.remoteStoragePath);
        remoteStorageScanner.registerAvailabilityAndPriorityNotifier(notifier);
        remoteStorageScanner.watchSegment(DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, 2);
        remoteStorageScanner.run();
        Assertions.assertThat(future).isNotDone();
        remoteStorageScanner.run();
        Assertions.assertThat(future).isDone();
    }

    @Test
    void testWatchSegmentIgnored() throws IOException {
        CompletableFuture future = new CompletableFuture();
        BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId> notifier = (partitionId, subpartitionId) -> future.complete(null);
        this.createSegmentFile(2);
        this.createSegmentFile(3);
        this.createSegmentFinishFile(3);
        RemoteStorageScanner remoteStorageScanner = new RemoteStorageScanner(this.remoteStoragePath);
        remoteStorageScanner.registerAvailabilityAndPriorityNotifier(notifier);
        remoteStorageScanner.run();
        remoteStorageScanner.watchSegment(DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, 0);
        remoteStorageScanner.run();
        Assertions.assertThat(future).isNotDone();
        remoteStorageScanner.watchSegment(DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, 1);
        remoteStorageScanner.run();
        Assertions.assertThat(future).isNotDone();
        remoteStorageScanner.watchSegment(DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, 2);
        remoteStorageScanner.run();
        Assertions.assertThat(future).isNotDone();
    }

    @Test
    void testStartAndClose() throws IOException, ExecutionException, InterruptedException {
        CompletableFuture future = new CompletableFuture();
        BiConsumer<TieredStoragePartitionId, TieredStorageSubpartitionId> notifier = (partitionId, subpartitionId) -> future.complete(null);
        this.createSegmentFile(0);
        this.createSegmentFinishFile(0);
        RemoteStorageScanner remoteStorageScanner = new RemoteStorageScanner(this.remoteStoragePath);
        remoteStorageScanner.registerAvailabilityAndPriorityNotifier(notifier);
        remoteStorageScanner.watchSegment(DEFAULT_PARTITION_ID, DEFAULT_SUBPARTITION_ID, 0);
        remoteStorageScanner.start();
        future.get();
    }

    @Test
    void testScanStrategy() {
        int maxScanIntervalMs = 10000;
        RemoteStorageScanner.ScanStrategy scanStrategy = new RemoteStorageScanner.ScanStrategy(maxScanIntervalMs);
        int currentScanIntervalMs = 100;
        Assertions.assertThat((int)scanStrategy.getInterval(currentScanIntervalMs)).isEqualTo(currentScanIntervalMs * 2);
        currentScanIntervalMs = 6000;
        Assertions.assertThat((int)scanStrategy.getInterval(currentScanIntervalMs)).isEqualTo(maxScanIntervalMs);
        currentScanIntervalMs = 12000;
        Assertions.assertThat((int)scanStrategy.getInterval(currentScanIntervalMs)).isEqualTo(maxScanIntervalMs);
    }

    private void createSegmentFile(int segmentId) throws IOException {
        Path segmentPath = SegmentPartitionFile.getSegmentPath((String)this.remoteStoragePath, (TieredStoragePartitionId)DEFAULT_PARTITION_ID, (int)DEFAULT_SUBPARTITION_ID.getSubpartitionId(), (long)segmentId);
        FSDataOutputStream outputStream = segmentPath.getFileSystem().create(segmentPath, FileSystem.WriteMode.OVERWRITE);
        outputStream.close();
    }

    private void createSegmentFinishFile(int segmentId) throws IOException {
        SegmentPartitionFile.writeSegmentFinishFile((String)this.remoteStoragePath, (TieredStoragePartitionId)DEFAULT_PARTITION_ID, (int)DEFAULT_SUBPARTITION_ID.getSubpartitionId(), (int)segmentId);
    }
}

