/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.ExceptionUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class JobMasterTriggerSavepointITCase
extends AbstractTestBase {
    private static CountDownLatch invokeLatch;
    private static volatile CountDownLatch triggerCheckpointLatch;
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private Path savepointDirectory;
    private MiniClusterClient clusterClient;
    private JobGraph jobGraph;

    private void setUpWithCheckpointInterval(long checkpointInterval) throws Exception {
        invokeLatch = new CountDownLatch(1);
        triggerCheckpointLatch = new CountDownLatch(1);
        this.savepointDirectory = this.temporaryFolder.newFolder().toPath();
        Assume.assumeTrue((String)"ClusterClient is not an instance of MiniClusterClient", (boolean)(miniClusterResource.getClusterClient() instanceof MiniClusterClient));
        this.clusterClient = (MiniClusterClient)miniClusterResource.getClusterClient();
        this.jobGraph = new JobGraph(new JobVertex[0]);
        JobVertex vertex = new JobVertex("testVertex");
        vertex.setInvokableClass(NoOpBlockingInvokable.class);
        this.jobGraph.addVertex(vertex);
        this.jobGraph.setSnapshotSettings(new JobCheckpointingSettings(Collections.singletonList(vertex.getID()), Collections.singletonList(vertex.getID()), Collections.singletonList(vertex.getID()), new CheckpointCoordinatorConfiguration(checkpointInterval, 60000L, 10L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, false, 0), null));
        ClientUtils.submitJob((ClusterClient)this.clusterClient, (JobGraph)this.jobGraph);
        Assert.assertTrue((boolean)invokeLatch.await(60L, TimeUnit.SECONDS));
        this.waitForJob();
    }

    @Test
    public void testStopJobAfterSavepoint() throws Exception {
        List savepoints;
        this.setUpWithCheckpointInterval(10L);
        String savepointLocation = this.cancelWithSavepoint();
        JobStatus jobStatus = (JobStatus)this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get();
        MatcherAssert.assertThat((Object)jobStatus, (Matcher)Matchers.isOneOf((Object[])new JobStatus[]{JobStatus.CANCELED, JobStatus.CANCELLING}));
        try (Stream<Path> savepointFiles = Files.list(this.savepointDirectory);){
            savepoints = savepointFiles.map(Path::getFileName).collect(Collectors.toList());
        }
        MatcherAssert.assertThat(savepoints, (Matcher)Matchers.hasItem((Object)Paths.get(savepointLocation, new String[0]).getFileName()));
    }

    @Test
    public void testStopJobAfterSavepointWithDeactivatedPeriodicCheckpointing() throws Exception {
        List savepoints;
        this.setUpWithCheckpointInterval(Long.MAX_VALUE);
        String savepointLocation = this.cancelWithSavepoint();
        JobStatus jobStatus = (JobStatus)this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get(60L, TimeUnit.SECONDS);
        MatcherAssert.assertThat((Object)jobStatus, (Matcher)Matchers.isOneOf((Object[])new JobStatus[]{JobStatus.CANCELED, JobStatus.CANCELLING}));
        try (Stream<Path> savepointFiles = Files.list(this.savepointDirectory);){
            savepoints = savepointFiles.map(Path::getFileName).collect(Collectors.toList());
        }
        MatcherAssert.assertThat(savepoints, (Matcher)Matchers.hasItem((Object)Paths.get(savepointLocation, new String[0]).getFileName()));
    }

    @Test
    public void testDoNotCancelJobIfSavepointFails() throws Exception {
        this.setUpWithCheckpointInterval(10L);
        try {
            Files.setPosixFilePermissions(this.savepointDirectory, Collections.emptySet());
        }
        catch (IOException e) {
            Assume.assumeNoException((Throwable)e);
        }
        try {
            this.cancelWithSavepoint();
        }
        catch (Exception e) {
            MatcherAssert.assertThat((Object)ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class).isPresent(), (Matcher)Matchers.equalTo((Object)true));
        }
        JobStatus jobStatus = (JobStatus)this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get(60L, TimeUnit.SECONDS);
        MatcherAssert.assertThat((Object)jobStatus, (Matcher)Matchers.equalTo((Object)JobStatus.RUNNING));
        triggerCheckpointLatch = new CountDownLatch(1);
        MatcherAssert.assertThat((Object)triggerCheckpointLatch.await(60L, TimeUnit.SECONDS), (Matcher)Matchers.equalTo((Object)true));
    }

    @Test
    public void testCancelWithSavepointWithoutConfiguredSavepointDirectory() throws Exception {
        block2: {
            this.setUpWithCheckpointInterval(10L);
            try {
                this.clusterClient.cancelWithSavepoint(this.jobGraph.getJobID(), null).get();
            }
            catch (Exception e) {
                if (ExceptionUtils.findThrowableWithMessage((Throwable)e, (String)"savepoint directory").isPresent()) break block2;
                throw e;
            }
        }
    }

    private void waitForJob() throws Exception {
        for (int i = 0; i < 60; ++i) {
            try {
                JobStatus jobStatus = (JobStatus)this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get(60L, TimeUnit.SECONDS);
                MatcherAssert.assertThat((Object)jobStatus.isGloballyTerminalState(), (Matcher)Matchers.equalTo((Object)false));
                if (jobStatus == JobStatus.RUNNING) {
                    return;
                }
            }
            catch (ExecutionException executionException) {
                // empty catch block
            }
            Thread.sleep(1000L);
        }
        throw new AssertionError((Object)"Job did not become running within timeout.");
    }

    private String cancelWithSavepoint() throws Exception {
        return (String)this.clusterClient.cancelWithSavepoint(this.jobGraph.getJobID(), this.savepointDirectory.toAbsolutePath().toString()).get();
    }

    public static class NoOpBlockingInvokable
    extends AbstractInvokable {
        public NoOpBlockingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() {
            invokeLatch.countDown();
            try {
                Thread.sleep(Long.MAX_VALUE);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }

        public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean advanceToEndOfEventTime) {
            TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot();
            checkpointStateHandles.putSubtaskStateByOperatorID(OperatorID.fromJobVertexID((JobVertexID)this.getEnvironment().getJobVertexId()), new OperatorSubtaskState());
            this.getEnvironment().acknowledgeCheckpoint(checkpointMetaData.getCheckpointId(), new CheckpointMetrics(), checkpointStateHandles);
            triggerCheckpointLatch.countDown();
            return CompletableFuture.completedFuture(true);
        }

        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
            return CompletableFuture.completedFuture(null);
        }

        public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
            return CompletableFuture.completedFuture(null);
        }
    }
}

