/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.Serializable;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.StateChangelogOptions;
import org.apache.flink.core.execution.RestoreMode;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorState;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SavepointKeyedStateHandle;
import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;

public class SavepointFormatITCase
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointFormatITCase.class);
    private static final String STATE_BACKEND_ROCKSDB = "ROCKSDB";
    private static final String STATE_BACKEND_HEAP = "HEAP";
    @TempDir
    java.nio.file.Path checkpointsDir;
    @TempDir
    java.nio.file.Path originalSavepointDir;
    @TempDir
    java.nio.file.Path renamedSavepointDir;
    @RegisterExtension
    LoggerAuditingExtension loggerAuditingExtension = new LoggerAuditingExtension(SavepointFormatITCase.class, Level.INFO);

    private static List<Arguments> parameters() {
        LinkedList<Arguments> result = new LinkedList<Arguments>();
        for (BiFunction builder : StateBackendConfig.builders) {
            for (boolean incremental : new boolean[]{true, false}) {
                for (boolean changelog : new boolean[]{true, false}) {
                    for (SavepointFormatType formatType : SavepointFormatType.values()) {
                        result.add(Arguments.of((Object[])new Object[]{formatType, builder.apply(incremental, changelog)}));
                    }
                }
            }
        }
        return result;
    }

    private void validateState(KeyedStateHandle state, SavepointFormatType formatType, StateBackendConfig backendConfig) {
        if (formatType == SavepointFormatType.CANONICAL) {
            MatcherAssert.assertThat((Object)state, (Matcher)CoreMatchers.instanceOf(SavepointKeyedStateHandle.class));
        } else if (backendConfig.isChangelogEnabled()) {
            MatcherAssert.assertThat((Object)state, (Matcher)CoreMatchers.instanceOf(ChangelogStateBackendHandle.class));
            for (KeyedStateHandle nestedState : ((ChangelogStateBackendHandle)state).getMaterializedStateHandles()) {
                this.validateNativeNonChangelogState(nestedState, backendConfig);
            }
        } else {
            this.validateNativeNonChangelogState(state, backendConfig);
        }
    }

    private void validateNativeNonChangelogState(KeyedStateHandle state, StateBackendConfig backendConfig) {
        if (STATE_BACKEND_ROCKSDB.equals(backendConfig.getName())) {
            MatcherAssert.assertThat((Object)state, (Matcher)CoreMatchers.instanceOf(IncrementalRemoteKeyedStateHandle.class));
        } else {
            MatcherAssert.assertThat((Object)state, (Matcher)CoreMatchers.instanceOf(KeyGroupsStateHandle.class));
        }
    }

    private static StateBackendConfig heap(boolean incremental, boolean changelogEnabled) {
        return new StateBackendConfig(changelogEnabled, incremental){

            @Override
            public String getName() {
                return SavepointFormatITCase.STATE_BACKEND_HEAP;
            }

            @Override
            public Configuration getConfiguration() {
                Configuration stateBackendConfig = super.getConfiguration();
                stateBackendConfig.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)MemorySize.ZERO);
                return stateBackendConfig;
            }

            @Override
            protected String getConfigName() {
                return "filesystem";
            }

            @Override
            public boolean isIncremental() {
                return false;
            }
        };
    }

    private static StateBackendConfig getRocksdb(boolean incremental, boolean changelogEnabled) {
        return new StateBackendConfig(changelogEnabled, incremental){

            @Override
            public String getName() {
                return SavepointFormatITCase.STATE_BACKEND_ROCKSDB;
            }

            @Override
            public int getCheckpointsBeforeSavepoint() {
                return 1;
            }

            @Override
            public boolean isIncremental() {
                return this.incremental;
            }

            @Override
            public Configuration getConfiguration() {
                Configuration stateBackendConfig = super.getConfiguration();
                stateBackendConfig.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)MemorySize.ZERO);
                return stateBackendConfig;
            }

            @Override
            protected String getConfigName() {
                return "rocksdb";
            }
        };
    }

    @ParameterizedTest(name="[{index}] {0}, {1}")
    @MethodSource(value={"parameters"})
    public void testTriggerSavepointAndResumeWithFileBasedCheckpointsAndRelocateBasePath(SavepointFormatType formatType, StateBackendConfig stateBackendConfig) throws Exception {
        int numTaskManagers = 2;
        int numSlotsPerTaskManager = 2;
        Configuration config = stateBackendConfig.getConfiguration();
        config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)this.checkpointsDir.toUri().toString());
        MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(config).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());
        miniClusterResource.before();
        try {
            String savepointPath = this.submitJobAndTakeSavepoint(miniClusterResource, formatType, stateBackendConfig.getCheckpointsBeforeSavepoint(), config);
            CheckpointMetadata metadata = TestUtils.loadCheckpointMetadata((String)savepointPath);
            OperatorState operatorState = metadata.getOperatorStates().stream().filter(this.hasKeyedState()).findFirst().get();
            operatorState.getStates().stream().flatMap(subtaskState -> subtaskState.getManagedKeyedState().stream()).forEach(handle -> this.validateState((KeyedStateHandle)handle, formatType, stateBackendConfig));
            this.relocateAndVerify(miniClusterResource, savepointPath, this.renamedSavepointDir, config);
        }
        catch (Throwable t) {
            LOG.info("Throwable caught, cluster will be shut down", t);
            throw t;
        }
        finally {
            miniClusterResource.after();
        }
    }

    @NotNull
    private Predicate<OperatorState> hasKeyedState() {
        return op -> op.hasSubtaskStates() && op.getStates().stream().findFirst().map(subtaskState -> subtaskState.getManagedKeyedState().hasState()).orElse(false) != false;
    }

    private void relocateAndVerify(MiniClusterWithClientResource cluster, String savepointPath, java.nio.file.Path renamedSavepointDir, Configuration config) throws Exception {
        Path oldPath = new Path(savepointPath);
        Path newPath = new Path(renamedSavepointDir.toUri().toString());
        new Path(savepointPath).getFileSystem().rename(oldPath, newPath);
        JobGraph jobGraph = SavepointFormatITCase.createJobGraph(config);
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)renamedSavepointDir.toUri().toString(), (boolean)false, (RestoreMode)RestoreMode.CLAIM));
        JobID jobId = jobGraph.getJobID();
        ClusterClient client = cluster.getClusterClient();
        client.submitJob(jobGraph).get();
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobId, (boolean)false);
    }

    private String submitJobAndTakeSavepoint(MiniClusterWithClientResource cluster, SavepointFormatType formatType, int checkpointBeforeSavepoint, Configuration config) throws Exception {
        JobGraph jobGraph = SavepointFormatITCase.createJobGraph(config);
        JobID jobId = jobGraph.getJobID();
        ClusterClient client = cluster.getClusterClient();
        client.submitJob(jobGraph).get();
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)cluster.getMiniCluster(), (JobID)jobId, (boolean)false);
        for (int i = 0; i < checkpointBeforeSavepoint; ++i) {
            cluster.getMiniCluster().triggerCheckpoint(jobId).get();
        }
        return (String)client.stopWithSavepoint(jobId, false, this.originalSavepointDir.toUri().toString(), formatType).get();
    }

    private static JobGraph createJobGraph(Configuration config) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)config);
        env.setParallelism(4);
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        env.disableOperatorChaining();
        env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).keyBy((KeySelector & Serializable)i -> i % 1000L).map((MapFunction)new StatefulCounter()).sinkTo((Sink)new DiscardingSink());
        return env.getStreamGraph().getJobGraph();
    }

    private static final class StatefulCounter
    extends RichMapFunction<Long, Long> {
        private ValueState<Long> counter;

        private StatefulCounter() {
        }

        public void open(OpenContext openContext) throws Exception {
            this.counter = this.getRuntimeContext().getState(new ValueStateDescriptor("counter", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO));
        }

        public Long map(Long value) throws Exception {
            this.counter.update((Object)((Long)Optional.ofNullable(this.counter.value()).orElse(0L) + value));
            return (Long)this.counter.value();
        }
    }

    private static abstract class StateBackendConfig {
        protected final boolean changelogEnabled;
        protected final boolean incremental;
        private static final List<BiFunction<Boolean, Boolean, StateBackendConfig>> builders = Arrays.asList((x$0, x$1) -> SavepointFormatITCase.access$300(x$0, x$1), (x$0, x$1) -> SavepointFormatITCase.access$200(x$0, x$1));

        protected StateBackendConfig(boolean changelogEnabled, boolean incremental) {
            this.changelogEnabled = changelogEnabled;
            this.incremental = incremental;
        }

        public abstract String getName();

        public Configuration getConfiguration() {
            Configuration stateBackendConfig = new Configuration();
            stateBackendConfig.set(StateBackendOptions.STATE_BACKEND, (Object)this.getConfigName());
            stateBackendConfig.set(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, (Object)this.incremental);
            stateBackendConfig.set(StateChangelogOptions.ENABLE_STATE_CHANGE_LOG, (Object)this.changelogEnabled);
            return stateBackendConfig;
        }

        public int getCheckpointsBeforeSavepoint() {
            return 0;
        }

        protected abstract String getConfigName();

        public final String toString() {
            return String.format("%s, incremental: %b, changelog: %b", this.getName(), this.incremental, this.changelogEnabled);
        }

        public abstract boolean isIncremental();

        private boolean isChangelogEnabled() {
            return this.changelogEnabled;
        }
    }
}

