package com.hazelcast.jet.impl.deployment;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.cluster.Address;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.JetService;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.SimpleTestInClusterSupport;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.impl.JetServiceBackend;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.util.List;
import java.util.concurrent.CancellationException;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/deployment/NonSmartClientTest.class */
public class NonSmartClientTest extends SimpleTestInClusterSupport {
    private static HazelcastInstance masterInstance;
    private static HazelcastInstance nonMasterInstance;
    private static HazelcastInstance masterClient;
    private static HazelcastInstance nonMasterClient;

    @BeforeClass
    public static void setUp() {
        initialize(2, null);
        masterInstance = instances()[0];
        nonMasterInstance = instances()[1];
        masterClient = createClientConnectingTo(masterInstance);
        nonMasterClient = createClientConnectingTo(nonMasterInstance);
    }

    private static HazelcastInstance createClientConnectingTo(HazelcastInstance hazelcastInstance) {
        Address address = hazelcastInstance.getCluster().getLocalMember().getAddress();
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.getNetworkConfig().setSmartRouting(false);
        clientConfig.getNetworkConfig().getAddresses().clear();
        clientConfig.getNetworkConfig().getAddresses().add(address.getHost() + ":" + address.getPort());
        return factory().newHazelcastClient(clientConfig);
    }

    @Test
    public void when_jobSubmitted_then_executedSuccessfully() {
        fillListWithInts(masterInstance.getList("source"), 10);
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.list("source")).writeTo(Sinks.list("sink"));
        nonMasterClient.getJet().newJob(create).join();
        Assert.assertEquals(10L, masterInstance.getList("sink").size());
    }

    @Test
    public void when_jobSubmitted_then_jobCanBeFetchedByIdOrName() {
        String randomName = randomName();
        DAG streamingDag = TestProcessors.streamingDag();
        JetService jet = nonMasterClient.getJet();
        long id = jet.newJob(streamingDag, new JobConfig().setName(randomName)).getId();
        assertTrueEventually(() -> {
            Assert.assertNotNull(jet.getJob(id));
            Assert.assertNotNull(jet.getJob(randomName));
            Assert.assertTrue(jet.getJobs().stream().anyMatch(job -> {
                return job.getId() == id;
            }));
            Assert.assertFalse(jet.getJobs(randomName).isEmpty());
            Assert.assertNotNull(jet.getJob(id).getStatus());
            Assert.assertEquals(jet.getJob(id).getStatus(), JobStatus.RUNNING);
            Job job2 = jet.getJob(randomName);
            Assert.assertNotNull(job2.getConfig());
            assertGreaterOrEquals("submissionTime", job2.getSubmissionTime(), 0L);
        }, 10L);
    }

    @Test
    public void when_jobSuspended_then_jobStatusIsSuspended() {
        Job startJobAndVerifyItIsRunning = startJobAndVerifyItIsRunning();
        startJobAndVerifyItIsRunning.suspend();
        assertJobStatusEventually(nonMasterClient.getJet().getJob(startJobAndVerifyItIsRunning.getName()), JobStatus.SUSPENDED);
    }

    @Test
    public void when_jobResumed_then_jobStatusIsRunning() {
        Job startJobAndVerifyItIsRunning = startJobAndVerifyItIsRunning();
        startJobAndVerifyItIsRunning.suspend();
        String name = startJobAndVerifyItIsRunning.getName();
        assertJobStatusEventually(nonMasterClient.getJet().getJob(name), JobStatus.SUSPENDED);
        startJobAndVerifyItIsRunning.resume();
        assertJobStatusEventually(nonMasterClient.getJet().getJob(name), JobStatus.RUNNING);
    }

    @Test
    public void when_jobCancelled_then_jobStatusIsCompleted() {
        Job startJobAndVerifyItIsRunning = startJobAndVerifyItIsRunning();
        startJobAndVerifyItIsRunning.cancel();
        assertJobStatusEventually(nonMasterClient.getJet().getJob(startJobAndVerifyItIsRunning.getName()), JobStatus.FAILED);
    }

    @Test
    public void when_jobSummaryListIsAsked_then_jobSummaryListReturned() {
        Job startJobAndVerifyItIsRunning = startJobAndVerifyItIsRunning();
        List jobSummaryList = nonMasterClient.getJet().getJobSummaryList();
        Assert.assertNotNull(jobSummaryList);
        Assert.assertTrue(jobSummaryList.stream().anyMatch(jobSummary -> {
            return jobSummary.getJobId() == startJobAndVerifyItIsRunning.getId();
        }));
    }

    @Test
    public void when_lightJobSubmittedToNonMaster_then_coordinatedByNonMaster() {
        Job newLightJob = nonMasterClient.getJet().newLightJob(TestProcessors.streamingDag());
        JetServiceBackend jetServiceBackend = (JetServiceBackend) getNodeEngineImpl(nonMasterInstance).getService("hz:impl:jetService");
        assertTrueEventually(() -> {
            Assert.assertNotNull(jetServiceBackend.getJobCoordinationService().getLightMasterContexts().get(Long.valueOf(newLightJob.getId())));
        });
    }

    @Test
    public void when_lightJobSubmittedToNonMaster_then_accessibleFromAllMembers() {
        Job newLightJob = nonMasterClient.getJet().newLightJob(TestProcessors.streamingDag());
        assertTrueEventually(() -> {
            Job job = masterInstance.getJet().getJob(newLightJob.getId());
            Assert.assertNotNull(job);
            Assert.assertEquals(JobStatus.RUNNING, job.getStatus());
        });
        Job job = masterClient.getJet().getJob(newLightJob.getId());
        Job job2 = nonMasterClient.getJet().getJob(newLightJob.getId());
        Assert.assertNotNull(job);
        Assert.assertNotNull(job2);
        Assert.assertNotEquals(0L, job.getSubmissionTime());
        Assert.assertEquals(job.getSubmissionTime(), job2.getSubmissionTime());
        Assert.assertFalse(job.getFuture().isDone());
        Assert.assertFalse(job2.getFuture().isDone());
        job.cancel();
        try {
            job.join();
            Assert.fail("join didn't fail");
        } catch (CancellationException e) {
        }
    }

    private Job startJobAndVerifyItIsRunning() {
        String randomName = randomName();
        Job newJob = nonMasterClient.getJet().newJob(TestProcessors.streamingDag(), new JobConfig().setName(randomName));
        assertJobStatusEventually(nonMasterClient.getJet().getJob(randomName), JobStatus.RUNNING);
        return newJob;
    }
}
