/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.util.ratelimit.RateLimiterStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExternalizedCheckpointRetention;
import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.connector.datagen.source.DataGeneratorSource;
import org.apache.flink.connector.datagen.source.GeneratorFunction;
import org.apache.flink.core.execution.CheckpointType;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Either;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class MapStateNullValueCheckpointingITCase
extends TestLogger {
    @Parameterized.Parameter
    public String stateBackend;
    @Parameterized.Parameter(value=1)
    public Either<CheckpointType, SavepointFormatType> snapshotType;
    @Rule
    public final TemporaryFolder tmpFolder = new TemporaryFolder();
    private static MiniClusterWithClientResource cluster;

    @Parameterized.Parameters(name="stateBackend : {0}, snapshotType : {1}")
    public static Collection<Object[]> data() {
        return Arrays.asList({"rocksdb", Either.Left((Object)CheckpointType.FULL)}, {"rocksdb", Either.Left((Object)CheckpointType.INCREMENTAL)}, {"rocksdb", Either.Right((Object)SavepointFormatType.CANONICAL)}, {"rocksdb", Either.Right((Object)SavepointFormatType.NATIVE)}, {"hashmap", Either.Left((Object)CheckpointType.FULL)}, {"hashmap", Either.Left((Object)CheckpointType.INCREMENTAL)}, {"hashmap", Either.Right((Object)SavepointFormatType.CANONICAL)}, {"hashmap", Either.Right((Object)SavepointFormatType.NATIVE)});
    }

    @Before
    public void before() throws Exception {
        Configuration conf = new Configuration();
        conf.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)this.tmpFolder.newFolder().toURI().toString());
        cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(conf).setNumberTaskManagers(10).setNumberSlotsPerTaskManager(1).build());
        cluster.before();
        StatefulMapper.firstRunFuture = new CompletableFuture();
        StatefulMapper.secondRunFuture = new CompletableFuture();
    }

    @After
    public void after() {
        if (cluster != null) {
            cluster.after();
            cluster = null;
        }
    }

    @Test
    public void testMapStateWithNullValueCheckpointingAndRestore() throws Exception {
        String savepointPath = this.runJobAndTakeSnapshot();
        Assertions.assertThat((String)savepointPath).isNotEmpty();
        this.restoreAndVerify(savepointPath);
    }

    private String runJobAndTakeSnapshot() throws Exception {
        Configuration conf = new Configuration();
        conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)this.tmpFolder.newFolder().toURI().toString());
        conf.set(CheckpointingOptions.EXTERNALIZED_CHECKPOINT_RETENTION, (Object)ExternalizedCheckpointRetention.RETAIN_ON_CANCELLATION);
        conf.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)this.tmpFolder.newFolder().toURI().toString());
        conf.set(StateBackendOptions.STATE_BACKEND, (Object)this.stateBackend);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        env.setParallelism(1);
        env.fromSource(MapStateNullValueCheckpointingITCase.createSource(), WatermarkStrategy.noWatermarks(), "Data Generator Source").keyBy((KeySelector & Serializable)v -> 0).map((MapFunction)new StatefulMapper(true)).sinkTo((Sink)new DiscardingSink());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        MiniCluster miniCluster = cluster.getMiniCluster();
        miniCluster.submitJob(jobGraph).get();
        JobID jobID = jobGraph.getJobID();
        StatefulMapper.firstRunFuture.get(2L, TimeUnit.MINUTES);
        if (this.snapshotType.isLeft()) {
            cluster.getClusterClient().triggerCheckpoint(jobID, (CheckpointType)this.snapshotType.left()).get(2L, TimeUnit.MINUTES);
            String checkpointPath = (String)CommonTestUtils.getLatestCompletedCheckpointPath((JobID)jobID, (MiniCluster)miniCluster).orElseThrow(() -> {
                throw new NoSuchElementException("No checkpoint was created yet");
            });
            cluster.getClusterClient().cancel(jobID);
            return checkpointPath;
        }
        return (String)cluster.getClusterClient().stopWithSavepoint(jobID, false, null, (SavepointFormatType)this.snapshotType.right()).get(2L, TimeUnit.MINUTES);
    }

    private static DataGeneratorSource<Long> createSource() {
        return new DataGeneratorSource((GeneratorFunction & Serializable)value -> value, Long.MAX_VALUE, RateLimiterStrategy.perSecond((double)5.0), (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO);
    }

    private void restoreAndVerify(String savepointPath) throws Exception {
        Configuration conf = new Configuration();
        conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)this.tmpFolder.newFolder().toURI().toString());
        conf.set(CheckpointingOptions.SAVEPOINT_DIRECTORY, (Object)this.tmpFolder.newFolder().toURI().toString());
        conf.set(StateBackendOptions.STATE_BACKEND, (Object)this.stateBackend);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        env.setParallelism(1);
        env.fromSource(MapStateNullValueCheckpointingITCase.createSource(), WatermarkStrategy.noWatermarks(), "Data Generator Source").keyBy((KeySelector & Serializable)v -> 0).map((MapFunction)new StatefulMapper(false)).sinkTo((Sink)new DiscardingSink());
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath));
        MiniCluster miniCluster = cluster.getMiniCluster();
        miniCluster.submitJob(jobGraph).get();
        Map<String, String> restoredState = StatefulMapper.secondRunFuture.get(2L, TimeUnit.MINUTES);
        Assertions.assertThat((String)restoredState.get("key")).isEqualTo("value");
        Assertions.assertThat((String)restoredState.get("null-key")).isNull();
        Assertions.assertThat((boolean)restoredState.containsKey("null-key")).isTrue();
    }

    private static class StatefulMapper
    extends RichMapFunction<Long, Long> {
        static CompletableFuture<Void> firstRunFuture;
        static CompletableFuture<Map<String, String>> secondRunFuture;
        private final boolean isFirstRun;
        private boolean hasPopulated;
        private transient MapState<String, String> mapState;

        StatefulMapper(boolean isFirstRun) {
            this.isFirstRun = isFirstRun;
        }

        public void open(OpenContext context) {
            MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("map-state", (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO, (TypeInformation)BasicTypeInfo.STRING_TYPE_INFO);
            this.mapState = this.getRuntimeContext().getMapState(mapStateDescriptor);
            ValueStateDescriptor hasPopulatedStateDescriptor = new ValueStateDescriptor("has-populated", (TypeInformation)BasicTypeInfo.BOOLEAN_TYPE_INFO);
            this.hasPopulated = false;
        }

        public Long map(Long value) throws Exception {
            if (this.hasPopulated) {
                return value;
            }
            if (this.isFirstRun) {
                this.mapState.put((Object)"key", (Object)"value");
                this.mapState.put((Object)"null-key", null);
                firstRunFuture.complete(null);
            } else {
                HashMap<String, Object> restoredState = new HashMap<String, Object>();
                restoredState.put("key", this.mapState.get((Object)"key"));
                restoredState.put("null-key", this.mapState.get((Object)"null-key"));
                secondRunFuture.complete(restoredState);
            }
            this.hasPopulated = true;
            return value;
        }
    }
}

