/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing.utils;

import java.io.File;
import java.net.URI;
import java.net.URL;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SavepointMigrationTestBase
extends TestBaseUtils {
    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(this.getConfiguration()).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).build());
    private static final Logger LOG = LoggerFactory.getLogger(SavepointMigrationTestBase.class);
    protected static final int DEFAULT_PARALLELISM = 4;

    protected static String getResourceFilename(String filename) {
        ClassLoader cl = SavepointMigrationTestBase.class.getClassLoader();
        URL resource = cl.getResource(filename);
        if (resource == null) {
            throw new NullPointerException("Missing snapshot resource.");
        }
        return resource.getFile();
    }

    protected SavepointMigrationTestBase() throws Exception {
    }

    private Configuration getConfiguration() throws Exception {
        Configuration config = new Configuration();
        config.setInteger("local.number-taskmanager", 1);
        config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 4);
        UUID id = UUID.randomUUID();
        File checkpointDir = TEMP_FOLDER.newFolder("checkpoints_" + id).getAbsoluteFile();
        File savepointDir = TEMP_FOLDER.newFolder("savepoints_" + id).getAbsoluteFile();
        if (!checkpointDir.exists() || !savepointDir.exists()) {
            throw new Exception("Test setup failed: failed to create (temporary) directories.");
        }
        LOG.info("Created temporary checkpoint directory: " + checkpointDir + ".");
        LOG.info("Created savepoint directory: " + savepointDir + ".");
        config.setString(CheckpointingOptions.STATE_BACKEND, "memory");
        config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
        config.set(CheckpointingOptions.FS_SMALL_FILE_THRESHOLD, (Object)MemorySize.ZERO);
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
        config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 300L);
        return config;
    }

    @SafeVarargs
    protected final void executeAndSavepoint(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer> ... expectedAccumulators) throws Exception {
        Deadline deadLine = Deadline.fromNow((Duration)Duration.ofMinutes(5L));
        ClusterClient client = this.miniClusterResource.getClusterClient();
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        JobExecutionResult jobSubmissionResult = ClientUtils.submitJob((ClusterClient)client, (JobGraph)jobGraph);
        LOG.info("Submitted job {} and waiting...", (Object)jobSubmissionResult.getJobID());
        boolean done = false;
        while (deadLine.hasTimeLeft()) {
            Thread.sleep(100L);
            Map accumulators = (Map)client.getAccumulators(jobSubmissionResult.getJobID()).get();
            boolean allDone = true;
            for (Tuple2<String, Integer> acc : expectedAccumulators) {
                Object accumOpt = accumulators.get(acc.f0);
                if (accumOpt == null) {
                    allDone = false;
                    break;
                }
                Integer numFinished = (Integer)accumOpt;
                if (numFinished.equals(acc.f1)) continue;
                allDone = false;
                break;
            }
            if (!allDone) continue;
            done = true;
            break;
        }
        if (!done) {
            Assert.fail((String)"Did not see the expected accumulator results within time limit.");
        }
        LOG.info("Triggering savepoint.");
        CompletableFuture savepointPathFuture = client.triggerSavepoint(jobSubmissionResult.getJobID(), null);
        String jobmanagerSavepointPath = (String)savepointPathFuture.get(deadLine.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        File jobManagerSavepoint = new File(new URI(jobmanagerSavepointPath).getPath());
        if (jobManagerSavepoint.isDirectory()) {
            FileUtils.moveDirectory((File)jobManagerSavepoint, (File)new File(savepointPath));
        } else {
            FileUtils.moveFile((File)jobManagerSavepoint, (File)new File(savepointPath));
        }
    }

    @SafeVarargs
    protected final void restoreAndExecute(StreamExecutionEnvironment env, String savepointPath, Tuple2<String, Integer> ... expectedAccumulators) throws Exception {
        Deadline deadLine = Deadline.fromNow((Duration)Duration.ofMinutes(5L));
        ClusterClient client = this.miniClusterResource.getClusterClient();
        JobGraph jobGraph = env.getStreamGraph().getJobGraph();
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath((String)savepointPath));
        JobExecutionResult jobSubmissionResult = ClientUtils.submitJob((ClusterClient)client, (JobGraph)jobGraph);
        boolean done = false;
        while (deadLine.hasTimeLeft()) {
            JobID jobId = jobSubmissionResult.getJobID();
            try {
                CompletableFuture jobStatusFuture = client.getJobStatus(jobSubmissionResult.getJobID());
                JobStatus jobStatus = (JobStatus)jobStatusFuture.get(5L, TimeUnit.SECONDS);
                Assert.assertNotEquals((Object)JobStatus.FAILED, (Object)jobStatus);
            }
            catch (Exception e) {
                Assert.fail((String)("Could not connect to job: " + e));
            }
            Thread.sleep(100L);
            Map accumulators = (Map)client.getAccumulators(jobId).get();
            boolean allDone = true;
            for (Tuple2<String, Integer> acc : expectedAccumulators) {
                Object numFinished = accumulators.get(acc.f0);
                if (numFinished == null) {
                    allDone = false;
                    break;
                }
                if (numFinished.equals(acc.f1)) continue;
                allDone = false;
                break;
            }
            if (!allDone) continue;
            done = true;
            break;
        }
        if (!done) {
            Assert.fail((String)"Did not see the expected accumulator results within time limit.");
        }
    }
}

