/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.runtime;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecution;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.WaitingCancelableInvokable;
import org.apache.flink.testutils.junit.FailsWithAdaptiveScheduler;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

public class DefaultSchedulerLocalRecoveryITCase
extends TestLogger {
    private static final long TIMEOUT = 10000L;

    @Test
    @Category(value={FailsWithAdaptiveScheduler.class})
    public void testLocalRecoveryFull() throws Exception {
        this.testLocalRecoveryInternal("full");
    }

    @Test
    @Category(value={FailsWithAdaptiveScheduler.class})
    public void testLocalRecoveryRegion() throws Exception {
        this.testLocalRecoveryInternal("region");
    }

    private void testLocalRecoveryInternal(String failoverStrategyValue) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.LOCAL_RECOVERY, (Object)true);
        configuration.setString(JobManagerOptions.EXECUTION_FAILOVER_STRATEGY.key(), failoverStrategyValue);
        int parallelism = 10;
        ArchivedExecutionGraph graph = this.executeSchedulingTest(configuration, 10);
        this.assertNonLocalRecoveredTasksEquals(graph, 1);
    }

    private void assertNonLocalRecoveredTasksEquals(ArchivedExecutionGraph graph, int expected) {
        int nonLocalRecoveredTasks = 0;
        for (ArchivedExecutionVertex vertex : graph.getAllExecutionVertices()) {
            int currentAttemptNumber = vertex.getCurrentExecutionAttempt().getAttemptNumber();
            if (currentAttemptNumber == 0) continue;
            AllocationID priorAllocation = ((ArchivedExecution)vertex.getExecutionHistory().getHistoricalExecution(currentAttemptNumber - 1).get()).getAssignedAllocationID();
            AllocationID currentAllocation = vertex.getCurrentExecutionAttempt().getAssignedAllocationID();
            Assert.assertNotNull((Object)priorAllocation);
            Assert.assertNotNull((Object)currentAllocation);
            if (currentAllocation.equals((Object)priorAllocation)) continue;
            ++nonLocalRecoveredTasks;
        }
        MatcherAssert.assertThat((Object)nonLocalRecoveredTasks, (Matcher)Matchers.is((Object)expected));
    }

    private ArchivedExecutionGraph executeSchedulingTest(Configuration configuration, int parallelism) throws Exception {
        long slotIdleTimeout = 10000L;
        configuration.set(JobManagerOptions.SLOT_IDLE_TIMEOUT, (Object)10000L);
        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, (Object)MemorySize.parse((String)"64mb"));
        configuration.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, (Object)MemorySize.parse((String)"16mb"));
        configuration.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, (Object)MemorySize.parse((String)"16mb"));
        MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder().withRandomPorts().setConfiguration(configuration).setNumTaskManagers(parallelism).setNumSlotsPerTaskManager(1).build();
        try (MiniCluster miniCluster = new MiniCluster(miniClusterConfiguration);){
            miniCluster.start();
            MiniClusterClient miniClusterClient = new MiniClusterClient(configuration, miniCluster);
            JobGraph jobGraph = this.createJobGraph(parallelism);
            JobID jobId = (JobID)miniClusterClient.submitJob(jobGraph).get(10000L, TimeUnit.SECONDS);
            this.waitUntilAllVerticesRunning(jobId, miniCluster);
            CompletableFuture terminationFuture = miniCluster.terminateTaskManager(0);
            terminationFuture.get();
            miniCluster.startTaskManager();
            this.waitUntilAllVerticesRunning(jobId, miniCluster);
            ArchivedExecutionGraph graph = (ArchivedExecutionGraph)miniCluster.getArchivedExecutionGraph(jobGraph.getJobID()).get();
            miniCluster.cancelJob(jobId).get();
            ArchivedExecutionGraph archivedExecutionGraph = graph;
            return archivedExecutionGraph;
        }
    }

    private void waitUntilAllVerticesRunning(JobID jobId, MiniCluster miniCluster) throws Exception {
        CommonTestUtils.waitForAllTaskRunning(() -> (AccessExecutionGraph)miniCluster.getExecutionGraph(jobId).get(10000L, TimeUnit.SECONDS), (boolean)false);
    }

    private JobGraph createJobGraph(int parallelism) throws IOException {
        JobVertex source = new JobVertex("v1");
        source.setInvokableClass(WaitingCancelableInvokable.class);
        source.setParallelism(parallelism);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)10L));
        return JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertices(Arrays.asList(source)).setExecutionConfig(executionConfig).build();
    }
}

