/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AccessExecution;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TaskExecutorITCase
extends TestLogger {
    private static final int NUM_TMS = 2;
    private static final int SLOTS_PER_TM = 2;
    private static final int PARALLELISM = 4;
    private MiniCluster miniCluster;

    @Before
    public void setup() throws Exception {
        this.miniCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setNumTaskManagers(2).setNumSlotsPerTaskManager(2).build());
        this.miniCluster.start();
    }

    @After
    public void teardown() throws Exception {
        if (this.miniCluster != null) {
            this.miniCluster.close();
        }
    }

    @Test
    public void testJobReExecutionAfterTaskExecutorTermination() throws Exception {
        JobGraph jobGraph = this.createJobGraph(4);
        CompletableFuture<JobResult> jobResultFuture = this.submitJobAndWaitUntilRunning(jobGraph);
        this.miniCluster.terminateTaskManager(0);
        JobResult jobResult = jobResultFuture.get();
        Assert.assertThat((Object)jobResult.isSuccess(), (Matcher)Matchers.is((Object)false));
        this.miniCluster.startTaskManager();
        JobGraph newJobGraph = this.createJobGraph(4);
        BlockingOperator.unblock();
        this.miniCluster.submitJob(newJobGraph).get();
        this.miniCluster.requestJobResult(newJobGraph.getJobID()).get();
    }

    @Test
    public void testJobRecoveryWithFailingTaskExecutor() throws Exception {
        JobGraph jobGraph = this.createJobGraphWithRestartStrategy(4);
        CompletableFuture<JobResult> jobResultFuture = this.submitJobAndWaitUntilRunning(jobGraph);
        this.miniCluster.startTaskManager();
        this.miniCluster.terminateTaskManager(0).get();
        BlockingOperator.unblock();
        Assert.assertThat((Object)jobResultFuture.get().isSuccess(), (Matcher)Matchers.is((Object)true));
    }

    private CompletableFuture<JobResult> submitJobAndWaitUntilRunning(JobGraph jobGraph) throws Exception {
        this.miniCluster.submitJob(jobGraph).get();
        CompletableFuture jobResultFuture = this.miniCluster.requestJobResult(jobGraph.getJobID());
        Assert.assertThat((Object)jobResultFuture.isDone(), (Matcher)Matchers.is((Object)false));
        CommonTestUtils.waitUntilCondition(this.jobIsRunning(() -> this.miniCluster.getExecutionGraph(jobGraph.getJobID())), 50L);
        return jobResultFuture;
    }

    private SupplierWithException<Boolean, Exception> jobIsRunning(Supplier<CompletableFuture<? extends AccessExecutionGraph>> executionGraphFutureSupplier) {
        Predicate<AccessExecution> runningOrFinished = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.RUNNING).or(ExecutionGraphTestUtils.isInExecutionState(ExecutionState.FINISHED));
        Predicate<AccessExecutionGraph> allExecutionsRunning = ExecutionGraphTestUtils.allExecutionsPredicate(runningOrFinished);
        return () -> {
            AccessExecutionGraph executionGraph = (AccessExecutionGraph)((CompletableFuture)executionGraphFutureSupplier.get()).join();
            return allExecutionsRunning.test(executionGraph) && executionGraph.getState() == JobStatus.RUNNING;
        };
    }

    private JobGraph createJobGraphWithRestartStrategy(int parallelism) throws IOException {
        JobGraph jobGraph = this.createJobGraph(parallelism);
        ExecutionConfig executionConfig = new ExecutionConfig();
        executionConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)2, (long)0L));
        jobGraph.setExecutionConfig(executionConfig);
        return jobGraph;
    }

    private JobGraph createJobGraph(int parallelism) {
        JobVertex sender = new JobVertex("Sender");
        sender.setParallelism(parallelism);
        sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
        JobVertex receiver = new JobVertex("Blocking receiver");
        receiver.setParallelism(parallelism);
        receiver.setInvokableClass(BlockingOperator.class);
        BlockingOperator.reset();
        receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        sender.setSlotSharingGroup(slotSharingGroup);
        receiver.setSlotSharingGroup(slotSharingGroup);
        return JobGraphTestUtils.streamingJobGraph(sender, receiver);
    }

    public static class BlockingOperator
    extends TestingAbstractInvokables.Receiver {
        private static CountDownLatch countDownLatch = new CountDownLatch(1);

        public BlockingOperator(Environment environment) {
            super(environment);
        }

        @Override
        public void invoke() throws Exception {
            countDownLatch.await();
            super.invoke();
        }

        public static void unblock() {
            countDownLatch.countDown();
        }

        public static void reset() {
            countDownLatch = new CountDownLatch(1);
        }
    }
}

