/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmaster;

import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingAbstractInvokables;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.InternalMiniClusterExtension;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

class JobRecoveryITCase {
    private static final int NUM_TMS = 1;
    private static final int SLOTS_PER_TM = 10;
    private static final int PARALLELISM = 10;
    @RegisterExtension
    private static final InternalMiniClusterExtension MINI_CLUSTER_EXTENSION = new InternalMiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(10).build());

    JobRecoveryITCase() {
    }

    @Test
    void testTaskFailureRecovery() throws Exception {
        this.runTaskFailureRecoveryTest(this.createjobGraph(false));
    }

    @Test
    void testTaskFailureWithSlotSharingRecovery() throws Exception {
        this.runTaskFailureRecoveryTest(this.createjobGraph(true));
    }

    private void runTaskFailureRecoveryTest(JobGraph jobGraph) throws Exception {
        MiniCluster miniCluster = MINI_CLUSTER_EXTENSION.getMiniCluster();
        miniCluster.submitJob(jobGraph).get();
        CompletableFuture jobResultFuture = miniCluster.requestJobResult(jobGraph.getJobID());
        Assertions.assertThat((boolean)((JobResult)jobResultFuture.get()).isSuccess()).isTrue();
    }

    private JobGraph createjobGraph(boolean slotSharingEnabled) throws IOException {
        JobVertex sender = new JobVertex("Sender");
        sender.setParallelism(10);
        sender.setInvokableClass(TestingAbstractInvokables.Sender.class);
        JobVertex receiver = new JobVertex("Receiver");
        receiver.setParallelism(10);
        receiver.setInvokableClass(FailingOnceReceiver.class);
        FailingOnceReceiver.reset();
        if (slotSharingEnabled) {
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            receiver.setSlotSharingGroup(slotSharingGroup);
            sender.setSlotSharingGroup(slotSharingGroup);
        }
        receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobGraph jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertices(Arrays.asList(sender, receiver)).setJobName(this.getClass().getSimpleName()).build();
        RestartStrategyUtils.configureFixedDelayRestartStrategy(jobGraph, 1, 0L);
        return jobGraph;
    }

    public static final class FailingOnceReceiver
    extends TestingAbstractInvokables.Receiver {
        private static volatile boolean failed = false;

        public FailingOnceReceiver(Environment environment) {
            super(environment);
        }

        @Override
        public void invoke() throws Exception {
            if (!failed && this.getEnvironment().getTaskInfo().getIndexOfThisSubtask() == 0) {
                failed = true;
                throw new FlinkRuntimeException(((Object)((Object)this)).getClass().getSimpleName());
            }
            super.invoke();
        }

        private static void reset() {
            failed = false;
        }
    }
}

