/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filemerging.SegmentFileStateHandle;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.checkpointing.ResumeCheckpointManuallyITCase;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.TernaryBoolean;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

public class SnapshotFileMergingCompatibilityITCase
extends TestLogger {
    private static final long DELETE_TIMEOUT_MILLS = 60000L;

    public static Collection<Object[]> parameters() {
        return Arrays.asList({RestoreMode.CLAIM, true}, {RestoreMode.CLAIM, false}, {RestoreMode.NO_CLAIM, true}, {RestoreMode.NO_CLAIM, false});
    }

    @ParameterizedTest(name="RestoreMode = {0}, fileMergingAcrossBoundary = {1}")
    @MethodSource(value={"parameters"})
    public void testSwitchFromDisablingToEnablingFileMerging(RestoreMode restoreMode, boolean fileMergingAcrossBoundary, @TempDir java.nio.file.Path checkpointDir) throws Exception {
        this.testSwitchingFileMerging(checkpointDir, false, true, restoreMode, fileMergingAcrossBoundary);
    }

    @ParameterizedTest(name="RestoreMode = {0}, fileMergingAcrossBoundary = {1}")
    @MethodSource(value={"parameters"})
    public void testSwitchFromEnablingToDisablingFileMerging(RestoreMode restoreMode, boolean fileMergingAcrossBoundary, @TempDir java.nio.file.Path checkpointDir) throws Exception {
        this.testSwitchingFileMerging(checkpointDir, true, false, restoreMode, fileMergingAcrossBoundary);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testSwitchingFileMerging(java.nio.file.Path checkpointDir, boolean firstFileMergingSwitch, boolean secondFileMergingSwitch, RestoreMode restoreMode, boolean fileMergingAcrossBoundary) throws Exception {
        String thirdCheckpoint;
        String secondCheckpoint;
        String firstCheckpoint;
        Configuration config = new Configuration();
        int consecutiveCheckpoint = 4;
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointDir.toUri().toString());
        config.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, (Object)true);
        config.set(CheckpointingOptions.FILE_MERGING_ACROSS_BOUNDARY, (Object)fileMergingAcrossBoundary);
        config.set(CheckpointingOptions.FILE_MERGING_ENABLED, (Object)firstFileMergingSwitch);
        MiniClusterWithClientResource firstCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
        EmbeddedRocksDBStateBackend stateBackend1 = new EmbeddedRocksDBStateBackend();
        stateBackend1.configure((ReadableConfig)config, Thread.currentThread().getContextClassLoader());
        firstCluster.before();
        try {
            firstCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint((StateBackend)stateBackend1, null, firstCluster, restoreMode, config, 4, true);
            Assertions.assertThat((String)firstCheckpoint).isNotNull();
            this.verifyStateHandleType(firstCheckpoint, firstFileMergingSwitch);
        }
        finally {
            firstCluster.after();
        }
        config.set(CheckpointingOptions.FILE_MERGING_ENABLED, (Object)secondFileMergingSwitch);
        EmbeddedRocksDBStateBackend stateBackend2 = new EmbeddedRocksDBStateBackend();
        stateBackend2.configure((ReadableConfig)config, Thread.currentThread().getContextClassLoader());
        MiniClusterWithClientResource secondCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
        secondCluster.before();
        try {
            secondCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint((StateBackend)stateBackend2, firstCheckpoint, secondCluster, restoreMode, config, 4, true);
            Assertions.assertThat((String)secondCheckpoint).isNotNull();
            this.verifyStateHandleType(secondCheckpoint, secondFileMergingSwitch);
            SnapshotFileMergingCompatibilityITCase.verifyCheckpointExistOrWaitDeleted(firstCheckpoint, SnapshotFileMergingCompatibilityITCase.determineFileExist(restoreMode, firstFileMergingSwitch, secondFileMergingSwitch), firstFileMergingSwitch);
        }
        finally {
            secondCluster.after();
        }
        EmbeddedRocksDBStateBackend stateBackend3 = new EmbeddedRocksDBStateBackend();
        stateBackend3.configure((ReadableConfig)config, Thread.currentThread().getContextClassLoader());
        MiniClusterWithClientResource thirdCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(3).setNumberSlotsPerTaskManager(2).build());
        thirdCluster.before();
        try {
            thirdCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint((StateBackend)stateBackend3, secondCheckpoint, thirdCluster, restoreMode, config, 4, true);
            Assertions.assertThat((String)thirdCheckpoint).isNotNull();
            this.verifyStateHandleType(thirdCheckpoint, secondFileMergingSwitch);
            SnapshotFileMergingCompatibilityITCase.verifyCheckpointExistOrWaitDeleted(secondCheckpoint, SnapshotFileMergingCompatibilityITCase.determineFileExist(restoreMode, secondFileMergingSwitch, secondFileMergingSwitch), secondFileMergingSwitch);
        }
        finally {
            thirdCluster.after();
        }
        EmbeddedRocksDBStateBackend stateBackend4 = new EmbeddedRocksDBStateBackend();
        stateBackend4.configure((ReadableConfig)config, Thread.currentThread().getContextClassLoader());
        MiniClusterWithClientResource fourthCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(3).setNumberSlotsPerTaskManager(2).build());
        fourthCluster.before();
        try {
            String fourthCheckpoint = ResumeCheckpointManuallyITCase.runJobAndGetExternalizedCheckpoint((StateBackend)stateBackend4, thirdCheckpoint, fourthCluster, restoreMode, config, 4, false);
            Assertions.assertThat((String)fourthCheckpoint).isNotNull();
            SnapshotFileMergingCompatibilityITCase.verifyCheckpointExistOrWaitDeleted(thirdCheckpoint, SnapshotFileMergingCompatibilityITCase.determineFileExist(restoreMode, secondFileMergingSwitch, secondFileMergingSwitch), secondFileMergingSwitch);
            SnapshotFileMergingCompatibilityITCase.verifyCheckpointExistOrWaitDeleted(fourthCheckpoint, TernaryBoolean.FALSE, secondFileMergingSwitch);
        }
        finally {
            fourthCluster.after();
        }
    }

    private void verifyStateHandleType(String checkpointPath, boolean fileMergingEnabled) throws IOException {
        CheckpointMetadata metadata = TestUtils.loadCheckpointMetadata((String)checkpointPath);
        boolean hasKeyedState = false;
        for (OperatorState operatorState : metadata.getOperatorStates()) {
            for (OperatorSubtaskState subtaskState : operatorState.getStates()) {
                ArrayList keyedStateHandles = new ArrayList(subtaskState.getManagedKeyedState());
                for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
                    Assertions.assertThat((Object)keyedStateHandle).isInstanceOf(IncrementalRemoteKeyedStateHandle.class);
                    ((IncrementalRemoteKeyedStateHandle)keyedStateHandle).streamSubHandles().forEach(handle -> {
                        if (fileMergingEnabled) {
                            Assertions.assertThat((Object)handle).isInstanceOf(SegmentFileStateHandle.class);
                        } else {
                            Assertions.assertThat((Object)handle).isNotInstanceOf(SegmentFileStateHandle.class);
                        }
                    });
                    hasKeyedState = true;
                }
            }
        }
        Assertions.assertThat((boolean)hasKeyedState).isTrue();
    }

    private static TernaryBoolean determineFileExist(RestoreMode mode, boolean lastFileMergingEnabled, boolean thisFileMergingEnabled) {
        if (mode == RestoreMode.CLAIM) {
            if (lastFileMergingEnabled || thisFileMergingEnabled) {
                return TernaryBoolean.FALSE;
            }
            return TernaryBoolean.UNDEFINED;
        }
        return TernaryBoolean.TRUE;
    }

    private static void verifyCheckpointExistOrWaitDeleted(String checkpointPath, TernaryBoolean exist, boolean fileMergingEnabled) throws Exception {
        Path checkpointDir = new Path(checkpointPath);
        FileSystem fs = checkpointDir.getFileSystem();
        Path baseDir = checkpointDir.getParent();
        Path sharedFile = new Path(baseDir, "shared");
        Path taskOwnedFile = new Path(baseDir, "taskowned");
        Assertions.assertThat((boolean)fs.exists(baseDir)).isTrue();
        Assertions.assertThat((boolean)fs.exists(sharedFile)).isTrue();
        Assertions.assertThat((boolean)fs.exists(taskOwnedFile)).isTrue();
        if (exist.equals((Object)TernaryBoolean.TRUE)) {
            Assertions.assertThat((boolean)fs.exists(checkpointDir)).isTrue();
            Assertions.assertThat((fs.listStatus(sharedFile) != null && fs.listStatus(sharedFile).length > 0 ? 1 : 0) != 0).isTrue();
            Assertions.assertThat((fs.listStatus(taskOwnedFile) != null && fs.listStatus(taskOwnedFile).length > 0 ? 1 : 0) != 0).isEqualTo(fileMergingEnabled);
        } else if (exist.equals((Object)TernaryBoolean.FALSE)) {
            long waited = 0L;
            boolean fileExist = true;
            while (fileExist) {
                try {
                    fileExist = fs.exists(checkpointDir) || fs.listStatus(sharedFile) != null && fs.listStatus(sharedFile).length > 0 || fs.listStatus(taskOwnedFile) != null && fs.listStatus(taskOwnedFile).length > 0;
                }
                catch (IOException iOException) {
                    // empty catch block
                }
                if (!fileExist) continue;
                Thread.sleep(500L);
                Assertions.assertThat((long)(waited += 500L)).isLessThan(60000L);
            }
        }
    }
}

