/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskexecutor;

import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.DefaultJobTable;
import org.apache.flink.runtime.taskexecutor.GlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.JobTable;
import org.apache.flink.runtime.taskexecutor.NoOpPartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
import org.apache.flink.runtime.taskexecutor.TestGlobalAggregateManager;
import org.apache.flink.runtime.taskexecutor.TestingJobServices;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpCheckpointResponder;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.SupplierWithException;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class DefaultJobTableTest
extends TestLogger {
    private static final SupplierWithException<JobTable.JobServices, RuntimeException> DEFAULT_JOB_SERVICES_SUPPLIER = () -> TestingJobServices.newBuilder().build();
    private final JobID jobId = new JobID();
    private DefaultJobTable jobTable;

    @Before
    public void setup() {
        this.jobTable = DefaultJobTable.create();
    }

    @After
    public void teardown() {
        if (this.jobTable != null) {
            this.jobTable.close();
        }
    }

    @Test
    public void getOrCreateJob_NoRegisteredJob_WillCreateNewJob() {
        JobTable.Job newJob = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        Assert.assertThat((Object)newJob.getJobId(), (Matcher)Matchers.is((Object)this.jobId));
        Assert.assertTrue((boolean)this.jobTable.getJob(this.jobId).isPresent());
    }

    @Test
    public void getOrCreateJob_RegisteredJob_WillReturnRegisteredJob() {
        JobTable.Job newJob = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        JobTable.Job otherJob = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        Assert.assertThat((Object)otherJob, (Matcher)Matchers.sameInstance((Object)newJob));
    }

    @Test
    public void closeJob_WillCloseJobServices() throws InterruptedException {
        OneShotLatch shutdownLibraryCacheManagerLatch = new OneShotLatch();
        TestingJobServices jobServices = TestingJobServices.newBuilder().setCloseRunnable(() -> ((OneShotLatch)shutdownLibraryCacheManagerLatch).trigger()).build();
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, () -> jobServices);
        job.close();
        shutdownLibraryCacheManagerLatch.await();
    }

    @Test
    public void closeJob_WillRemoveItFromJobTable() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        job.close();
        Assert.assertFalse((boolean)this.jobTable.getJob(this.jobId).isPresent());
    }

    @Test
    public void connectJob_NotConnected_Succeeds() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        ResourceID resourceId = ResourceID.generate();
        JobTable.Connection connection = this.connectJob(job, resourceId);
        Assert.assertThat((Object)connection.getJobId(), (Matcher)Matchers.is((Object)this.jobId));
        Assert.assertThat((Object)connection.getResourceId(), (Matcher)Matchers.is((Object)resourceId));
        Assert.assertTrue((boolean)this.jobTable.getConnection(this.jobId).isPresent());
        Assert.assertTrue((boolean)this.jobTable.getConnection(resourceId).isPresent());
    }

    private JobTable.Connection connectJob(JobTable.Job job, ResourceID resourceId) {
        return job.connect(resourceId, (JobMasterGateway)new TestingJobMasterGatewayBuilder().build(), (TaskManagerActions)new NoOpTaskManagerActions(), (CheckpointResponder)NoOpCheckpointResponder.INSTANCE, (GlobalAggregateManager)new TestGlobalAggregateManager(), (PartitionProducerStateChecker)new NoOpPartitionProducerStateChecker());
    }

    @Test(expected=IllegalStateException.class)
    public void connectJob_Connected_Fails() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        this.connectJob(job, ResourceID.generate());
        this.connectJob(job, ResourceID.generate());
    }

    @Test
    public void disconnectConnection_RemovesConnection() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        ResourceID resourceId = ResourceID.generate();
        JobTable.Connection connection = this.connectJob(job, resourceId);
        connection.disconnect();
        Assert.assertFalse((boolean)this.jobTable.getConnection(this.jobId).isPresent());
        Assert.assertFalse((boolean)this.jobTable.getConnection(resourceId).isPresent());
    }

    @Test(expected=IllegalStateException.class)
    public void access_AfterBeingClosed_WillFail() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        job.close();
        job.asConnection();
    }

    @Test(expected=IllegalStateException.class)
    public void connectJob_AfterBeingClosed_WillFail() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        job.close();
        this.connectJob(job, ResourceID.generate());
    }

    @Test(expected=IllegalStateException.class)
    public void accessJobManagerGateway_AfterBeingDisconnected_WillFail() {
        JobTable.Job job = this.jobTable.getOrCreateJob(this.jobId, DEFAULT_JOB_SERVICES_SUPPLIER);
        JobTable.Connection connection = this.connectJob(job, ResourceID.generate());
        connection.disconnect();
        connection.getJobManagerGateway();
    }

    @Test
    public void close_WillCloseAllRegisteredJobs() throws InterruptedException {
        CountDownLatch shutdownLibraryCacheManagerLatch = new CountDownLatch(2);
        TestingJobServices jobServices1 = TestingJobServices.newBuilder().setCloseRunnable(shutdownLibraryCacheManagerLatch::countDown).build();
        TestingJobServices jobServices2 = TestingJobServices.newBuilder().setCloseRunnable(shutdownLibraryCacheManagerLatch::countDown).build();
        this.jobTable.getOrCreateJob(this.jobId, () -> jobServices1);
        this.jobTable.getOrCreateJob(new JobID(), () -> jobServices2);
        this.jobTable.close();
        shutdownLibraryCacheManagerLatch.await();
        Assert.assertTrue((boolean)this.jobTable.isEmpty());
    }
}

