/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing;

import java.io.IOException;
import java.io.Serializable;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.mocks.MockSource;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.Collector;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class ManualCheckpointITCase
extends AbstractTestBase {
    @Parameterized.Parameter
    public StorageSupplier storageSupplier;
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Parameterized.Parameters
    public static StorageSupplier[] parameters() throws IOException {
        return new StorageSupplier[]{path -> new JobManagerCheckpointStorage(), FileSystemCheckpointStorage::new};
    }

    @Test
    public void testTriggeringWhenPeriodicDisabled() throws Exception {
        int parallelism = MINI_CLUSTER_RESOURCE.getNumberSlots();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        env.getCheckpointConfig().setCheckpointStorage((CheckpointStorage)this.storageSupplier.apply(this.temporaryFolder.newFolder().toURI().toString()));
        env.fromSource((Source)MockSource.continuous((int)parallelism).build(), WatermarkStrategy.noWatermarks(), "generator").keyBy((KeySelector & Serializable)key -> key % parallelism).flatMap((FlatMapFunction)new StatefulMapper()).sinkTo((Sink)new DiscardingSink());
        JobClient jobClient = env.executeAsync();
        JobID jobID = jobClient.getJobID();
        MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
        CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.RUNNING));
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)miniCluster, (JobID)jobID, (boolean)false);
        miniCluster.triggerCheckpoint(jobID).get();
        miniCluster.cancelJob(jobID).get();
        this.queryCompletedCheckpointsUntil(miniCluster, jobID, count -> count == 1L);
    }

    @Test
    public void testTriggeringWhenPeriodicEnabled() throws Exception {
        int parallelism = MINI_CLUSTER_RESOURCE.getNumberSlots();
        int checkpointingInterval = 500;
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(parallelism);
        env.enableCheckpointing(500L);
        env.getCheckpointConfig().setCheckpointStorage((CheckpointStorage)this.storageSupplier.apply(this.temporaryFolder.newFolder().toURI().toString()));
        env.fromSource((Source)MockSource.continuous((int)parallelism).build(), WatermarkStrategy.noWatermarks(), "generator").keyBy((KeySelector & Serializable)key -> key % parallelism).flatMap((FlatMapFunction)new StatefulMapper()).sinkTo((Sink)new DiscardingSink());
        JobClient jobClient = env.executeAsync();
        JobID jobID = jobClient.getJobID();
        MiniCluster miniCluster = MINI_CLUSTER_RESOURCE.getMiniCluster();
        CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.RUNNING));
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)miniCluster, (JobID)jobID, (boolean)false);
        CommonTestUtils.waitUntilCondition(() -> this.queryCompletedCheckpoints(miniCluster, jobID) > 0L, (int)250);
        long numberOfPeriodicCheckpoints = this.queryCompletedCheckpoints(miniCluster, jobID);
        miniCluster.triggerCheckpoint(jobID).get();
        miniCluster.cancelJob(jobID).get();
        this.queryCompletedCheckpointsUntil(miniCluster, jobID, count -> count >= numberOfPeriodicCheckpoints + 1L);
    }

    private void queryCompletedCheckpointsUntil(MiniCluster miniCluster, JobID jobID, Predicate<Long> predicate) throws Exception {
        long counts;
        while (!predicate.test(counts = this.queryCompletedCheckpoints(miniCluster, jobID))) {
        }
    }

    private long queryCompletedCheckpoints(MiniCluster miniCluster, JobID jobID) throws InterruptedException, ExecutionException {
        return ((ArchivedExecutionGraph)miniCluster.getArchivedExecutionGraph(jobID).get()).getCheckpointStatsSnapshot().getCounts().getNumberOfCompletedCheckpoints();
    }

    private static final class StatefulMapper
    extends RichFlatMapFunction<Integer, Long> {
        private ValueState<Long> count;

        private StatefulMapper() {
        }

        public void open(OpenContext openContext) throws Exception {
            this.count = this.getRuntimeContext().getState(new ValueStateDescriptor("count", (TypeInformation)BasicTypeInfo.LONG_TYPE_INFO));
        }

        public void flatMap(Integer value, Collector<Long> out) throws Exception {
            long sum = (Long)Optional.ofNullable(this.count.value()).orElse(0L) + (long)value.intValue();
            this.count.update((Object)sum);
            out.collect((Object)sum);
        }
    }

    private static interface StorageSupplier
    extends Function<String, CheckpointStorage> {
    }
}

