/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.jobmanager;

import java.util.Collection;
import java.util.HashMap;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CustomExtension;
import org.apache.flink.core.testutils.EachCallbackWrapper;
import org.apache.flink.runtime.dispatcher.NoOpJobGraphListener;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.DefaultJobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStore;
import org.apache.flink.runtime.jobmanager.JobGraphStoreUtil;
import org.apache.flink.runtime.jobmanager.JobGraphStoreWatcher;
import org.apache.flink.runtime.jobmanager.TestingJobGraphListener;
import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStoreUtil;
import org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStoreWatcher;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.RetrievableStreamStateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerExtension;
import org.apache.flink.runtime.zookeeper.ZooKeeperExtension;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.curator5.org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.Executors;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowableTypeAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class ZooKeeperJobGraphsStoreITCase
extends TestLogger {
    private final ZooKeeperExtension zooKeeperExtension = new ZooKeeperExtension();
    @RegisterExtension
    final EachCallbackWrapper<ZooKeeperExtension> zooKeeperResource = new EachCallbackWrapper((CustomExtension)this.zooKeeperExtension);
    @RegisterExtension
    final TestingFatalErrorHandlerExtension testingFatalErrorHandlerResource = new TestingFatalErrorHandlerExtension();
    private static final RetrievableStateStorageHelper<JobGraph> localStateStorage = jobGraph -> {
        ByteStreamStateHandle byteStreamStateHandle = new ByteStreamStateHandle(String.valueOf(UUID.randomUUID()), InstantiationUtil.serializeObject((Object)jobGraph));
        return new RetrievableStreamStateHandle((StreamStateHandle)byteStreamStateHandle);
    };

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testPutAndRemoveJobGraph() throws Exception {
        JobGraphStore jobGraphs = this.createZooKeeperJobGraphStore("/testPutAndRemoveJobGraph");
        try {
            JobGraphStore.JobGraphListener listener = (JobGraphStore.JobGraphListener)Mockito.mock(JobGraphStore.JobGraphListener.class);
            jobGraphs.start(listener);
            JobGraph jobGraph = this.createJobGraph(new JobID(), "JobName");
            Assertions.assertThat((Collection)jobGraphs.getJobIds()).isEmpty();
            jobGraphs.putJobGraph(jobGraph);
            Collection jobIds = jobGraphs.getJobIds();
            Assertions.assertThat((Collection)jobIds).hasSize(1);
            JobID jobId = (JobID)jobIds.iterator().next();
            this.verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId));
            jobGraph = this.createJobGraph(jobGraph.getJobID(), "Updated JobName");
            jobGraphs.putJobGraph(jobGraph);
            jobIds = jobGraphs.getJobIds();
            Assertions.assertThat((Collection)jobIds).hasSize(1);
            jobId = (JobID)jobIds.iterator().next();
            this.verifyJobGraphs(jobGraph, jobGraphs.recoverJobGraph(jobId));
            jobGraphs.globalCleanupAsync(jobGraph.getJobID(), Executors.directExecutor()).join();
            Assertions.assertThat((Collection)jobGraphs.getJobIds()).isEmpty();
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.atMost((int)1))).onAddedJobGraph((JobID)Matchers.any(JobID.class));
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedJobGraph((JobID)Matchers.any(JobID.class));
            jobGraphs.globalCleanupAsync(jobGraph.getJobID(), Executors.directExecutor()).join();
        }
        finally {
            jobGraphs.stop();
        }
    }

    @Nonnull
    private JobGraphStore createZooKeeperJobGraphStore(String fullPath) throws Exception {
        CuratorFramework client = this.zooKeeperExtension.getZooKeeperClient(this.testingFatalErrorHandlerResource.getTestingFatalErrorHandler());
        client.newNamespaceAwareEnsurePath(fullPath).ensure(client.getZookeeperClient());
        CuratorFramework facade = client.usingNamespace(client.getNamespace() + fullPath);
        ZooKeeperStateHandleStore zooKeeperStateHandleStore = new ZooKeeperStateHandleStore(facade, localStateStorage);
        return new DefaultJobGraphStore((StateHandleStore)zooKeeperStateHandleStore, (JobGraphStoreWatcher)new ZooKeeperJobGraphStoreWatcher(new PathChildrenCache(facade, "/", false)), (JobGraphStoreUtil)ZooKeeperJobGraphStoreUtil.INSTANCE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRecoverJobGraphs() throws Exception {
        JobGraphStore jobGraphs = this.createZooKeeperJobGraphStore("/testRecoverJobGraphs");
        try {
            JobGraphStore.JobGraphListener listener = (JobGraphStore.JobGraphListener)Mockito.mock(JobGraphStore.JobGraphListener.class);
            jobGraphs.start(listener);
            HashMap<JobID, JobGraph> expected = new HashMap<JobID, JobGraph>();
            JobID[] jobIds = new JobID[]{new JobID(), new JobID(), new JobID()};
            expected.put(jobIds[0], this.createJobGraph(jobIds[0]));
            expected.put(jobIds[1], this.createJobGraph(jobIds[1]));
            expected.put(jobIds[2], this.createJobGraph(jobIds[2]));
            for (JobGraph jobGraph : expected.values()) {
                jobGraphs.putJobGraph(jobGraph);
            }
            Collection actual = jobGraphs.getJobIds();
            Assertions.assertThat((Collection)actual).hasSameSizeAs(expected.entrySet());
            for (JobID jobId : actual) {
                JobGraph jobGraph = jobGraphs.recoverJobGraph(jobId);
                Assertions.assertThat(expected).containsKey((Object)jobGraph.getJobID());
                this.verifyJobGraphs((JobGraph)expected.get(jobGraph.getJobID()), jobGraph);
                jobGraphs.globalCleanupAsync(jobGraph.getJobID(), Executors.directExecutor()).join();
            }
            Assertions.assertThat((Collection)jobGraphs.getJobIds()).isEmpty();
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.atMost((int)expected.size()))).onAddedJobGraph((JobID)Matchers.any(JobID.class));
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedJobGraph((JobID)Matchers.any(JobID.class));
        }
        finally {
            jobGraphs.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentAddJobGraph() throws Exception {
        JobGraphStore jobGraphs = null;
        JobGraphStore otherJobGraphs = null;
        try {
            jobGraphs = this.createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
            otherJobGraphs = this.createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
            JobGraph jobGraph = this.createJobGraph(new JobID());
            JobGraph otherJobGraph = this.createJobGraph(new JobID());
            JobGraphStore.JobGraphListener listener = (JobGraphStore.JobGraphListener)Mockito.mock(JobGraphStore.JobGraphListener.class);
            final JobID[] actualOtherJobId = new JobID[1];
            final CountDownLatch sync = new CountDownLatch(1);
            ((JobGraphStore.JobGraphListener)Mockito.doAnswer((Answer)new Answer<Void>(){

                public Void answer(InvocationOnMock invocation) throws Throwable {
                    actualOtherJobId[0] = (JobID)invocation.getArguments()[0];
                    sync.countDown();
                    return null;
                }
            }).when((Object)listener)).onAddedJobGraph((JobID)Matchers.any(JobID.class));
            jobGraphs.start(listener);
            otherJobGraphs.start((JobGraphStore.JobGraphListener)NoOpJobGraphListener.INSTANCE);
            jobGraphs.putJobGraph(jobGraph);
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onAddedJobGraph((JobID)Matchers.any(JobID.class));
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedJobGraph((JobID)Matchers.any(JobID.class));
            otherJobGraphs.putJobGraph(otherJobGraph);
            sync.await();
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.times((int)1))).onAddedJobGraph((JobID)Matchers.any(JobID.class));
            ((JobGraphStore.JobGraphListener)Mockito.verify((Object)listener, (VerificationMode)Mockito.never())).onRemovedJobGraph((JobID)Matchers.any(JobID.class));
            Assertions.assertThat((Comparable)actualOtherJobId[0]).isEqualTo((Object)otherJobGraph.getJobID());
        }
        finally {
            if (jobGraphs != null) {
                jobGraphs.stop();
            }
            if (otherJobGraphs != null) {
                otherJobGraphs.stop();
            }
        }
    }

    @Test
    public void testUpdateJobGraphYouDidNotGetOrAdd() throws Exception {
        JobGraphStore jobGraphs = this.createZooKeeperJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
        JobGraphStore otherJobGraphs = this.createZooKeeperJobGraphStore("/testUpdateJobGraphYouDidNotGetOrAdd");
        jobGraphs.start((JobGraphStore.JobGraphListener)NoOpJobGraphListener.INSTANCE);
        otherJobGraphs.start((JobGraphStore.JobGraphListener)NoOpJobGraphListener.INSTANCE);
        JobGraph jobGraph = this.createJobGraph(new JobID());
        jobGraphs.putJobGraph(jobGraph);
        Assertions.assertThatExceptionOfType(IllegalStateException.class).isThrownBy(() -> otherJobGraphs.putJobGraph(jobGraph));
    }

    @Test
    public void testJobGraphRemovalFailureAndLockRelease() throws Exception {
        JobGraphStore submittedJobGraphStore = this.createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
        JobGraphStore otherSubmittedJobGraphStore = this.createZooKeeperJobGraphStore("/testConcurrentAddJobGraph");
        TestingJobGraphListener listener = new TestingJobGraphListener();
        submittedJobGraphStore.start((JobGraphStore.JobGraphListener)listener);
        otherSubmittedJobGraphStore.start((JobGraphStore.JobGraphListener)listener);
        JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
        submittedJobGraphStore.putJobGraph(jobGraph);
        JobGraph recoveredJobGraph = otherSubmittedJobGraphStore.recoverJobGraph(jobGraph.getJobID());
        Assertions.assertThat((Object)recoveredJobGraph).isNotNull();
        ((ThrowableTypeAssert)Assertions.assertThatExceptionOfType(Exception.class).as("It should not be possible to remove the JobGraph since the first store still has a lock on it.", new Object[0])).isThrownBy(() -> otherSubmittedJobGraphStore.globalCleanupAsync(recoveredJobGraph.getJobID(), Executors.directExecutor()).join());
        submittedJobGraphStore.stop();
        otherSubmittedJobGraphStore.globalCleanupAsync(recoveredJobGraph.getJobID(), Executors.directExecutor()).join();
        Assertions.assertThat((Object)otherSubmittedJobGraphStore.recoverJobGraph(recoveredJobGraph.getJobID())).isNull();
        otherSubmittedJobGraphStore.stop();
    }

    private JobGraph createJobGraph(JobID jobId) {
        return this.createJobGraph(jobId, "Test JobGraph");
    }

    private JobGraph createJobGraph(JobID jobId, String jobName) {
        JobVertex jobVertex = new JobVertex("Test JobVertex");
        jobVertex.setParallelism(1);
        return JobGraphBuilder.newStreamingJobGraphBuilder().setJobName(jobName).setJobId(jobId).addJobVertex(jobVertex).build();
    }

    private void verifyJobGraphs(JobGraph expected, JobGraph actual) {
        Assertions.assertThat((String)actual.getName()).isEqualTo(expected.getName());
        Assertions.assertThat((Comparable)actual.getJobID()).isEqualTo((Object)expected.getJobID());
    }
}

