/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.MiniClusterClient;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.util.ExceptionUtils;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class JobMasterStopWithSavepointITCase
extends AbstractTestBase {
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final long CHECKPOINT_INTERVAL = 10L;
    private static final int PARALLELISM = 2;
    private static OneShotLatch finishingLatch;
    private static CountDownLatch invokeLatch;
    private static CountDownLatch numberOfRestarts;
    private static final AtomicLong syncSavepointId;
    private static volatile CountDownLatch checkpointsToWaitFor;
    private Path savepointDirectory;
    private MiniClusterClient clusterClient;
    private JobGraph jobGraph;

    @Test
    public void suspendWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished() throws Exception {
        this.stopWithSavepointNormalExecutionHelper(false);
    }

    @Test
    public void terminateWithSavepointWithoutComplicationsShouldSucceedAndLeadJobToFinished() throws Exception {
        this.stopWithSavepointNormalExecutionHelper(true);
    }

    private void stopWithSavepointNormalExecutionHelper(boolean terminate) throws Exception {
        List savepoints;
        this.setUpJobGraph(NoOpBlockingStreamTask.class, RestartStrategies.noRestart());
        CompletableFuture<String> savepointLocationFuture = this.stopWithSavepoint(terminate);
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.equalTo((Object)JobStatus.RUNNING));
        finishingLatch.trigger();
        String savepointLocation = savepointLocationFuture.get();
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.equalTo((Object)JobStatus.FINISHED));
        try (Stream<Path> savepointFiles = Files.list(this.savepointDirectory);){
            savepoints = savepointFiles.map(Path::getFileName).collect(Collectors.toList());
        }
        MatcherAssert.assertThat(savepoints, (Matcher)Matchers.hasItem((Object)Paths.get(savepointLocation, new String[0]).getFileName()));
    }

    @Test
    public void throwingExceptionOnCallbackWithNoRestartsShouldFailTheSuspend() throws Exception {
        this.throwingExceptionOnCallbackWithoutRestartsHelper(false);
    }

    @Test
    public void throwingExceptionOnCallbackWithNoRestartsShouldFailTheTerminate() throws Exception {
        this.throwingExceptionOnCallbackWithoutRestartsHelper(true);
    }

    private void throwingExceptionOnCallbackWithoutRestartsHelper(boolean terminate) throws Exception {
        this.setUpJobGraph(ExceptionOnCallbackStreamTask.class, RestartStrategies.noRestart());
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.equalTo((Object)JobStatus.RUNNING));
        try {
            this.stopWithSavepoint(terminate).get();
            Assert.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertTrue((syncSavepointId.get() > 0L ? 1 : 0) != 0);
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.either((Matcher)Matchers.equalTo((Object)JobStatus.FAILED)).or(Matchers.equalTo((Object)JobStatus.FAILING)));
    }

    @Test
    public void throwingExceptionOnCallbackWithRestartsShouldSimplyRestartInSuspend() throws Exception {
        this.throwingExceptionOnCallbackWithRestartsHelper(false);
    }

    @Test
    public void throwingExceptionOnCallbackWithRestartsShouldSimplyRestartInTerminate() throws Exception {
        this.throwingExceptionOnCallbackWithRestartsHelper(true);
    }

    private void throwingExceptionOnCallbackWithRestartsHelper(boolean terminate) throws Exception {
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofSeconds(15L));
        int numberOfCheckpointsToExpect = 10;
        numberOfRestarts = new CountDownLatch(2);
        checkpointsToWaitFor = new CountDownLatch(10);
        this.setUpJobGraph(ExceptionOnCallbackStreamTask.class, RestartStrategies.fixedDelayRestart((int)15, (Time)Time.milliseconds((long)10L)));
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.equalTo((Object)JobStatus.RUNNING));
        try {
            this.stopWithSavepoint(terminate).get(50L, TimeUnit.MILLISECONDS);
            Assert.fail();
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertTrue((boolean)numberOfRestarts.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
        Assert.assertTrue((boolean)checkpointsToWaitFor.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS));
        Assert.assertTrue((syncSavepointId.get() > 0L ? 1 : 0) != 0);
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.equalTo((Object)JobStatus.RUNNING));
        long syncSavepoint = syncSavepointId.get();
        Assert.assertTrue((syncSavepoint > 0L && syncSavepoint < 10L ? 1 : 0) != 0);
        this.clusterClient.cancel(this.jobGraph.getJobID()).get();
        MatcherAssert.assertThat((Object)this.getJobStatus(), (Matcher)Matchers.either((Matcher)Matchers.equalTo((Object)JobStatus.CANCELLING)).or(Matchers.equalTo((Object)JobStatus.CANCELED)));
    }

    @Test
    public void testRestartCheckpointCoordinatorIfStopWithSavepointFails() throws Exception {
        this.setUpJobGraph(CheckpointCountingTask.class, RestartStrategies.noRestart());
        try {
            Files.setPosixFilePermissions(this.savepointDirectory, Collections.emptySet());
        }
        catch (IOException e) {
            Assume.assumeNoException((Throwable)e);
        }
        try {
            this.stopWithSavepoint(true).get();
            Assert.fail();
        }
        catch (Exception e) {
            Optional checkpointExceptionOptional = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
            if (!checkpointExceptionOptional.isPresent()) {
                throw e;
            }
            String exceptionMessage = ((CheckpointException)checkpointExceptionOptional.get()).getMessage();
            Assert.assertTrue((String)("Stop with savepoint failed because of another cause " + exceptionMessage), (boolean)exceptionMessage.contains(CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE.message()));
        }
        JobStatus jobStatus = (JobStatus)this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get(60L, TimeUnit.SECONDS);
        MatcherAssert.assertThat((Object)jobStatus, (Matcher)Matchers.equalTo((Object)JobStatus.RUNNING));
        checkpointsToWaitFor = new CountDownLatch(1);
        Assert.assertTrue((boolean)checkpointsToWaitFor.await(60L, TimeUnit.SECONDS));
    }

    private CompletableFuture<String> stopWithSavepoint(boolean terminate) {
        return miniClusterResource.getMiniCluster().stopWithSavepoint(this.jobGraph.getJobID(), this.savepointDirectory.toAbsolutePath().toString(), terminate);
    }

    private JobStatus getJobStatus() throws InterruptedException, ExecutionException {
        return (JobStatus)this.clusterClient.getJobStatus(this.jobGraph.getJobID()).get();
    }

    private void setUpJobGraph(Class<? extends AbstractInvokable> invokable, RestartStrategies.RestartStrategyConfiguration restartStrategy) throws Exception {
        finishingLatch = new OneShotLatch();
        invokeLatch = new CountDownLatch(2);
        numberOfRestarts = new CountDownLatch(2);
        checkpointsToWaitFor = new CountDownLatch(10);
        syncSavepointId.set(-1L);
        this.savepointDirectory = this.temporaryFolder.newFolder().toPath();
        Assume.assumeTrue((String)"ClusterClient is not an instance of MiniClusterClient", (boolean)(miniClusterResource.getClusterClient() instanceof MiniClusterClient));
        this.clusterClient = (MiniClusterClient)miniClusterResource.getClusterClient();
        ExecutionConfig config = new ExecutionConfig();
        config.setRestartStrategy(restartStrategy);
        JobVertex vertex = new JobVertex("testVertex");
        vertex.setInvokableClass(invokable);
        vertex.setParallelism(2);
        JobCheckpointingSettings jobCheckpointingSettings = new JobCheckpointingSettings(new CheckpointCoordinatorConfiguration(10L, 60000L, 10L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, false, 0), null);
        this.jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().setExecutionConfig(config).addJobVertex(vertex).setJobCheckpointingSettings(jobCheckpointingSettings).build();
        this.clusterClient.submitJob(this.jobGraph).get();
        Assert.assertTrue((boolean)invokeLatch.await(60L, TimeUnit.SECONDS));
        this.waitForJob();
    }

    private void waitForJob() throws Exception {
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofMinutes(5L));
        JobID jobID = this.jobGraph.getJobID();
        CommonTestUtils.waitForAllTaskRunning(() -> (AccessExecutionGraph)miniClusterResource.getMiniCluster().getExecutionGraph(jobID).get(60L, TimeUnit.SECONDS), (Deadline)deadline);
    }

    static {
        syncSavepointId = new AtomicLong();
    }

    public static class CheckpointCountingTask
    extends StreamTaskTest.NoOpStreamTask {
        private transient MailboxDefaultAction.Suspension suspension;

        public CheckpointCountingTask(Environment environment) throws Exception {
            super(environment);
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            invokeLatch.countDown();
            if (this.suspension == null) {
                this.suspension = controller.suspendDefaultAction();
            } else {
                controller.allActionsCompleted();
            }
        }

        protected void cancelTask() throws Exception {
            super.cancelTask();
            if (this.suspension != null) {
                this.suspension.resume();
            }
        }

        public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            long taskIndex = this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
            if (taskIndex == 0L) {
                checkpointsToWaitFor.countDown();
            }
            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
        }
    }

    public static class NoOpBlockingStreamTask
    extends StreamTaskTest.NoOpStreamTask {
        private transient MailboxDefaultAction.Suspension suspension;

        public NoOpBlockingStreamTask(Environment environment) throws Exception {
            super(environment);
        }

        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            invokeLatch.countDown();
            if (this.suspension == null) {
                this.suspension = controller.suspendDefaultAction();
            } else {
                controller.allActionsCompleted();
            }
        }

        public void finishTask() throws Exception {
            finishingLatch.await();
            if (this.suspension != null) {
                this.suspension.resume();
            }
        }
    }

    public static class ExceptionOnCallbackStreamTask
    extends CheckpointCountingTask {
        private long synchronousSavepointId = Long.MIN_VALUE;

        public ExceptionOnCallbackStreamTask(Environment environment) throws Exception {
            super(environment);
        }

        @Override
        protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
            long taskIndex = this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
            if (taskIndex == 0L) {
                numberOfRestarts.countDown();
            }
            super.processInput(controller);
        }

        @Override
        public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
            long checkpointId = checkpointMetaData.getCheckpointId();
            CheckpointType checkpointType = checkpointOptions.getCheckpointType();
            if (checkpointType.isSynchronous()) {
                this.synchronousSavepointId = checkpointId;
                syncSavepointId.compareAndSet(-1L, this.synchronousSavepointId);
            }
            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions);
        }

        public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
            long taskIndex = this.getEnvironment().getTaskInfo().getIndexOfThisSubtask();
            if (checkpointId == this.synchronousSavepointId && taskIndex == 0L) {
                throw new RuntimeException("Expected Exception");
            }
            return super.notifyCheckpointCompleteAsync(checkpointId);
        }

        public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
            return CompletableFuture.completedFuture(null);
        }

        protected void finishTask() {
            this.mailboxProcessor.allActionsCompleted();
        }
    }
}

