package com.hazelcast.jet.core;

import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.impl.JobResult;
import com.hazelcast.jet.impl.TerminationMode;
import com.hazelcast.jet.impl.exception.JobTerminateRequestedException;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.SlowTest;
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({SlowTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/SuspendResumeTest.class */
public class SuspendResumeTest extends JetTestSupport {
    private static final int NODE_COUNT = 3;
    private static final int PARALLELISM = 4;

    @Rule
    public ExpectedException expectedException = ExpectedException.none();
    private HazelcastInstance[] instances;
    private DAG dag;
    private Config config;

    @Before
    public void before() {
        TestProcessors.reset(12);
        this.instances = new HazelcastInstance[3];
        this.config = smallInstanceConfig();
        this.config.getJetConfig().setCooperativeThreadCount(4);
        for (int i = 0; i < 3; i++) {
            this.instances[i] = createHazelcastInstance(this.config);
        }
        this.dag = new DAG().vertex(new Vertex("test", new TestProcessors.MockPS(TestProcessors.NoOutputSourceP::new, 3)));
    }

    @Test
    public void when_suspendAndResume_then_jobResumes() throws Exception {
        Job newJob = this.instances[0].getJet().newJob(this.dag);
        TestProcessors.NoOutputSourceP.executionStarted.await();
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        newJob.resume();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        newJob.join();
        Assert.assertEquals(6L, TestProcessors.MockPS.initCount.get());
        assertTrueEventually(() -> {
            Assert.assertEquals(6L, TestProcessors.MockPS.closeCount.get());
            Assert.assertEquals(3L, TestProcessors.MockPS.receivedCloseErrors.size());
            Assert.assertTrue(TestProcessors.MockPS.receivedCloseErrors.stream().allMatch(this::isSuspend));
        }, 5L);
    }

    @Test
    public void when_memberAddedWhileSuspended_then_jobResumesOnAllMembers() throws Exception {
        Job newJob = this.instances[0].getJet().newJob(this.dag);
        TestProcessors.NoOutputSourceP.executionStarted.await();
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        createHazelcastInstance(this.config);
        newJob.resume();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        newJob.join();
        Assert.assertEquals(7L, TestProcessors.MockPS.initCount.get());
        assertTrueEventually(() -> {
            Assert.assertEquals(7L, TestProcessors.MockPS.closeCount.get());
            Assert.assertEquals(3L, TestProcessors.MockPS.receivedCloseErrors.size());
            Assert.assertTrue(TestProcessors.MockPS.receivedCloseErrors.stream().allMatch(this::isSuspend));
        }, 5L);
    }

    @Test
    public void when_nonCoordinatorDiesWhileSuspended_then_jobResumes() throws Exception {
        Job newJob = this.instances[0].getJet().newJob(this.dag);
        TestProcessors.NoOutputSourceP.executionStarted.await();
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        this.instances[2].getLifecycleService().terminate();
        newJob.resume();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        newJob.join();
        Assert.assertEquals(5L, TestProcessors.MockPS.initCount.get());
        assertTrueEventually(() -> {
            Assert.assertEquals(5L, TestProcessors.MockPS.closeCount.get());
            Assert.assertEquals(3L, TestProcessors.MockPS.receivedCloseErrors.size());
            Assert.assertTrue(TestProcessors.MockPS.receivedCloseErrors.stream().allMatch(this::isSuspend));
        }, 5L);
    }

    @Test
    public void when_coordinatorDiesWhileSuspended_then_jobResumes() throws Exception {
        Job newJob = this.instances[1].getJet().newJob(this.dag);
        TestProcessors.NoOutputSourceP.executionStarted.await();
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        this.instances[0].getLifecycleService().terminate();
        int i = 0;
        while (true) {
            try {
                newJob.resume();
                assertJobStatusEventually(newJob, JobStatus.RUNNING);
                TestProcessors.NoOutputSourceP.proceedLatch.countDown();
                newJob.join();
                Assert.assertEquals(5L, TestProcessors.MockPS.initCount.get());
                assertTrueEventually(() -> {
                    Assert.assertEquals(5L, TestProcessors.MockPS.closeCount.get());
                    Assert.assertEquals(3L, TestProcessors.MockPS.receivedCloseErrors.size());
                    Assert.assertTrue(TestProcessors.MockPS.receivedCloseErrors.stream().allMatch(this::isSuspend));
                }, 5L);
                return;
            } catch (JobNotFoundException e) {
                if (i == 20) {
                    throw e;
                }
                sleepSeconds(1);
                i++;
            }
        }
    }

    @Test
    public void when_joinAndThenSuspend_then_joinBlocks() throws Exception {
        Job newJob = this.instances[1].getJet().newJob(this.dag);
        TestProcessors.NoOutputSourceP.executionStarted.await();
        newJob.getClass();
        Future spawn = spawn(newJob::join);
        sleepSeconds(1);
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        assertTrueAllTheTime(() -> {
            Assert.assertFalse(spawn.isDone());
        }, 2L);
    }

    @Test
    public void when_suspendAndThenJoin_then_joinBlocks() throws Exception {
        Job newJob = this.instances[1].getJet().newJob(this.dag);
        TestProcessors.NoOutputSourceP.executionStarted.await();
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        newJob.getClass();
        Future spawn = spawn(newJob::join);
        assertTrueAllTheTime(() -> {
            Assert.assertFalse(spawn.isDone());
        }, 2L);
    }

    @Test
    public void when_joinSuspendedJob_then_waitsAndReturnsAfterResume() throws Exception {
        Job newJob = this.instances[0].getJet().newJob(this.dag);
        TestProcessors.NoOutputSourceP.executionStarted.await();
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        newJob.getClass();
        Future spawn = spawn(newJob::join);
        assertTrueAllTheTime(() -> {
            Assert.assertFalse(spawn.isDone());
        }, 1L);
        newJob.resume();
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        assertTrueAllTheTime(() -> {
            Assert.assertFalse(spawn.isDone());
        }, 1L);
        TestProcessors.NoOutputSourceP.proceedLatch.countDown();
        assertTrueEventually(() -> {
            Assert.assertTrue(spawn.isDone());
        }, 5L);
    }

    @Test
    public void when_cancelSuspendedJob_then_jobCancels() throws Exception {
        Job newJob = this.instances[0].getJet().newJob(this.dag);
        TestProcessors.NoOutputSourceP.executionStarted.await();
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        cancelAndJoin(newJob);
        assertJobStatusEventually(newJob, JobStatus.FAILED);
        JobRepository jobRepository = new JobRepository(this.instances[0]);
        assertTrueEventually(() -> {
            Assert.assertNull("JobRecord", jobRepository.getJobRecord(newJob.getId()));
            JobResult jobResult = jobRepository.getJobResult(newJob.getId());
            assertContains(jobResult.getFailureText(), CancellationException.class.getName());
            Assert.assertFalse("Job result successful", jobResult.isSuccessful());
        });
    }

    @Test
    public void when_restartSuspendedJob_then_fail() throws Exception {
        Job newJob = this.instances[0].getJet().newJob(this.dag);
        TestProcessors.NoOutputSourceP.executionStarted.await();
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("Cannot RESTART_GRACEFUL, job status is SUSPENDED");
        newJob.restart();
    }

    @Test
    public void when_suspendSuspendedJob_then_fail() throws Exception {
        Job newJob = this.instances[0].getJet().newJob(this.dag);
        TestProcessors.NoOutputSourceP.executionStarted.await();
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        this.expectedException.expect(IllegalStateException.class);
        this.expectedException.expectMessage("Cannot SUSPEND_GRACEFUL, job status is SUSPENDED");
        newJob.suspend();
    }

    @Test
    public void when_jobSuspendedAndCoordinatorShutDown_then_jobStaysSuspended() throws Exception {
        when_jobSuspendedAndCoordinatorGone_then_jobStaysSuspended(true);
    }

    @Test
    public void when_jobSuspendedAndCoordinatorTerminated_then_jobStaysSuspended() throws Exception {
        when_jobSuspendedAndCoordinatorGone_then_jobStaysSuspended(false);
    }

    private void when_jobSuspendedAndCoordinatorGone_then_jobStaysSuspended(boolean z) throws Exception {
        Assert.assertTrue(this.instances[0].getCluster().isMaster());
        Job newJob = this.instances[1].getJet().newJob(this.dag);
        TestProcessors.NoOutputSourceP.executionStarted.await();
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        if (z) {
            this.instances[0].shutdown();
        } else {
            this.instances[0].getLifecycleService().terminate();
        }
        assertTrueAllTheTime(() -> {
            Assert.assertEquals(JobStatus.SUSPENDED, newJob.getStatus());
        }, 10L);
    }

    private boolean isSuspend(Throwable th) {
        return (th instanceof JobTerminateRequestedException) && ((JobTerminateRequestedException) th).mode().actionAfterTerminate() == TerminationMode.ActionAfterTerminate.SUSPEND;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1818100338:
                if (implMethodName.equals("<init>")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/TestProcessors$NoOutputSourceP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return TestProcessors.NoOutputSourceP::new;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
