/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Comparator;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class CoordinatedSourceRescaleITCase
extends TestLogger {
    public static final String CREATED_CHECKPOINT = "successfully created checkpoint";
    public static final String RESTORED_CHECKPOINT = "successfully restored checkpoint";
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(7).build());
    @Rule
    public final TemporaryFolder temp = new TemporaryFolder();

    @Test
    public void testDownscaling() throws Exception {
        File checkpointDir = this.temp.newFolder();
        File lastCheckpoint = this.generateCheckpoint(checkpointDir, 7);
        this.resumeCheckpoint(checkpointDir, lastCheckpoint, 3);
    }

    @Test
    public void testUpscaling() throws Exception {
        File checkpointDir = this.temp.newFolder();
        File lastCheckpoint = this.generateCheckpoint(checkpointDir, 3);
        this.resumeCheckpoint(checkpointDir, lastCheckpoint, 7);
    }

    private File generateCheckpoint(File checkpointDir, int p) throws IOException {
        StreamExecutionEnvironment env = this.createEnv(checkpointDir, null, p);
        Assertions.assertThatThrownBy(() -> env.execute("create checkpoint")).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)CREATED_CHECKPOINT)});
        return Files.find(checkpointDir.toPath(), 2, this::isCompletedCheckpoint, new FileVisitOption[0]).max(Comparator.comparing(Path::toString)).map(Path::toFile).orElseThrow(() -> new IllegalStateException("Cannot generate checkpoint"));
    }

    private boolean isCompletedCheckpoint(Path path, BasicFileAttributes attr) {
        return attr.isDirectory() && path.getFileName().toString().startsWith("chk-") && Files.exists(path.resolve("_metadata"), new LinkOption[0]);
    }

    private void resumeCheckpoint(File checkpointDir, File restoreCheckpoint, int p) {
        StreamExecutionEnvironment env = this.createEnv(checkpointDir, restoreCheckpoint, p);
        Assertions.assertThatThrownBy(() -> env.execute("resume checkpoint")).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches((String)RESTORED_CHECKPOINT)});
    }

    private StreamExecutionEnvironment createEnv(File checkpointDir, @Nullable File restoreCheckpoint, int p) {
        Configuration conf = new Configuration();
        conf.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, (Object)checkpointDir.toURI().toString());
        conf.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, (Object)MemorySize.parse((String)"4kb"));
        if (restoreCheckpoint != null) {
            conf.set(SavepointConfigOptions.SAVEPOINT_PATH, (Object)restoreCheckpoint.toURI().toString());
        }
        conf.set(TaskManagerOptions.NUM_TASK_SLOTS, (Object)p);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)conf);
        env.setParallelism(p);
        env.enableCheckpointing(100L);
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        env.setRestartStrategy(RestartStrategies.noRestart());
        DataStreamSource stream = env.fromSequence(0L, Long.MAX_VALUE);
        stream.map((MapFunction)new FailingMapFunction(restoreCheckpoint == null)).addSink((SinkFunction)new SleepySink());
        return env;
    }

    private static class SleepySink
    implements SinkFunction<Long> {
        private static final long serialVersionUID = -3542950841846119765L;

        private SleepySink() {
        }

        public void invoke(Long value, SinkFunction.Context context) throws Exception {
            if (value % 1000L == 0L) {
                Thread.sleep(1L);
            }
        }
    }

    private static class FailingMapFunction
    extends RichMapFunction<Long, Long>
    implements CheckpointListener {
        private static final long serialVersionUID = 699621912578369378L;
        private final boolean generateCheckpoint;
        private boolean processedRecord;

        FailingMapFunction(boolean generateCheckpoint) {
            this.generateCheckpoint = generateCheckpoint;
        }

        public Long map(Long value) throws Exception {
            this.processedRecord = true;
            if (!this.generateCheckpoint && value % 100L == 42L) {
                throw new Exception(CoordinatedSourceRescaleITCase.RESTORED_CHECKPOINT);
            }
            return value;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            if (this.generateCheckpoint && this.processedRecord && checkpointId > 5L) {
                throw new Exception(CoordinatedSourceRescaleITCase.CREATED_CHECKPOINT);
            }
        }
    }
}

