package com.hazelcast.jet.impl.deployment;

import com.hazelcast.cluster.Address;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.impl.processor.NoopP;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/deployment/JetClassLoaderTest.class */
public class JetClassLoaderTest extends JetTestSupport {

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/deployment/JetClassLoaderTest$LeakClassLoaderP.class */
    public static class LeakClassLoaderP extends AbstractProcessor {
        private static volatile JetClassLoader classLoader;

        private LeakClassLoaderP() {
        }

        public boolean complete() {
            classLoader = Thread.currentThread().getContextClassLoader();
            return true;
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/deployment/JetClassLoaderTest$LeakClassLoaderPMS.class */
    private static class LeakClassLoaderPMS implements ProcessorMetaSupplier {
        private static Map<String, List<ClassLoader>> classLoaders = new HashMap();

        private LeakClassLoaderPMS() {
        }

        public void init(@Nonnull ProcessorMetaSupplier.Context context) throws Exception {
            putClassLoader("init");
        }

        @Nonnull
        public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> list) {
            putClassLoader("get");
            return address -> {
                return ProcessorSupplier.of(NoopP::new);
            };
        }

        public void close(@Nullable Throwable th) throws Exception {
            putClassLoader("close");
        }

        private void putClassLoader(String str) {
            classLoaders.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            }).add(Thread.currentThread().getContextClassLoader());
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 1818100338:
                    if (implMethodName.equals("<init>")) {
                        z = false;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 8 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/NoopP") && serializedLambda.getImplMethodSignature().equals("()V")) {
                        return NoopP::new;
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/deployment/JetClassLoaderTest$LeakClassLoaderPS.class */
    private static class LeakClassLoaderPS implements ProcessorSupplier {
        private static Map<String, List<ClassLoader>> classLoaders = new HashMap();

        private LeakClassLoaderPS() {
        }

        public void init(@Nonnull ProcessorSupplier.Context context) throws Exception {
            putClassLoader("init");
        }

        @Nonnull
        public Collection<? extends Processor> get(int i) {
            putClassLoader("get");
            return Collections.singleton(new NoopP());
        }

        public void close(@Nullable Throwable th) throws Exception {
            putClassLoader("close");
        }

        private void putClassLoader(String str) {
            classLoaders.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            }).add(Thread.currentThread().getContextClassLoader());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/deployment/JetClassLoaderTest$SourceP.class */
    public static class SourceP extends AbstractProcessor {
        private volatile boolean emitted;
        private volatile boolean emittedWm;
        private volatile boolean restored;

        private SourceP() {
            this.emitted = false;
            this.emittedWm = false;
            this.restored = false;
        }

        public boolean complete() {
            if (!this.emitted) {
                this.emitted = tryEmit(1);
                return false;
            }
            if (this.emittedWm) {
                return this.restored;
            }
            this.emittedWm = tryEmit(new Watermark(System.currentTimeMillis()));
            return false;
        }

        public boolean finishSnapshotRestore() {
            this.restored = true;
            return super.finishSnapshotRestore();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/deployment/JetClassLoaderTest$TargetP.class */
    public static class TargetP extends AbstractProcessor {
        private volatile boolean received;
        private volatile boolean restored;
        private static Map<String, List<ClassLoader>> classLoaders = new HashMap();

        private TargetP() {
            this.received = false;
            this.restored = false;
        }

        private void putClassLoader(String str) {
            classLoaders.computeIfAbsent(str, str2 -> {
                return new ArrayList();
            }).add(Thread.currentThread().getContextClassLoader());
        }

        public boolean isCooperative() {
            putClassLoader("isCooperative");
            return super.isCooperative();
        }

        protected void init(@Nonnull Processor.Context context) throws Exception {
            putClassLoader("init");
            super.init(context);
        }

        public void process(int i, @Nonnull Inbox inbox) {
            putClassLoader("process");
            super.process(i, inbox);
        }

        public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
            putClassLoader("tryProcessWatermark");
            return super.tryProcessWatermark(watermark);
        }

        public boolean tryProcess() {
            putClassLoader("tryProcess");
            return super.tryProcess();
        }

        public boolean completeEdge(int i) {
            putClassLoader("completeEdge");
            return super.completeEdge(i);
        }

        public boolean complete() {
            putClassLoader("complete");
            return this.restored && this.received;
        }

        public boolean saveToSnapshot() {
            getOutbox().offerToSnapshot(1, 1);
            putClassLoader("saveToSnapshot");
            return super.saveToSnapshot();
        }

        public boolean snapshotCommitPrepare() {
            putClassLoader("snapshotCommitPrepare");
            return super.snapshotCommitPrepare();
        }

        public boolean snapshotCommitFinish(boolean z) {
            putClassLoader("snapshotCommitFinish");
            return super.snapshotCommitFinish(z);
        }

        protected void restoreFromSnapshot(@Nonnull Object obj, @Nonnull Object obj2) {
            putClassLoader("restoreFromSnapshot");
        }

        public boolean finishSnapshotRestore() {
            this.restored = true;
            putClassLoader("finishSnapshotRestore");
            return super.finishSnapshotRestore();
        }

        public void close() throws Exception {
            putClassLoader("close");
            super.close();
        }

        public boolean closeIsCooperative() {
            putClassLoader("closeIsCooperative");
            return super.closeIsCooperative();
        }

        protected boolean tryProcess(int i, @Nonnull Object obj) throws Exception {
            this.received = true;
            return true;
        }
    }

    @Test
    public void when_jobCompleted_then_classLoaderShutDown() {
        DAG dag = new DAG();
        dag.newVertex("v", () -> {
            return new LeakClassLoaderP();
        }).localParallelism(1);
        createHazelcastInstance(smallInstanceWithResourceUploadConfig()).getJet().newJob(dag).join();
        Assert.assertTrue("The classloader should have been shutdown after job completion", LeakClassLoaderP.classLoader.isShutdown());
    }

    @Test
    public void when_processorCalled_then_contextClassLoaderSet() {
        DAG dag = new DAG();
        dag.edge(Edge.between(dag.newVertex("v1", () -> {
            return new SourceP();
        }).localParallelism(1), dag.newVertex("v2", () -> {
            return new TargetP();
        }).localParallelism(1)));
        HazelcastInstance createHazelcastInstance = createHazelcastInstance(smallInstanceWithResourceUploadConfig());
        Job newJob = createHazelcastInstance.getJet().newJob(dag, new JobConfig().setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE).setSnapshotIntervalMillis(1L));
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        newJob.suspend();
        assertJobStatusEventually(newJob, JobStatus.SUSPENDED);
        newJob.resume();
        newJob.join();
        assertClassLoaders(TargetP.classLoaders);
        assertClassLoaderForAllMethodsChecked(Processor.class, TargetP.classLoaders);
    }

    @Test
    public void when_processorSupplierCalled_then_contextClassLoaderSet() {
        DAG dag = new DAG();
        dag.newVertex("v", new LeakClassLoaderPS()).localParallelism(1);
        createHazelcastInstance(smallInstanceWithResourceUploadConfig()).getJet().newJob(dag).join();
        assertClassLoaders(LeakClassLoaderPS.classLoaders);
        Assertions.assertThat(LeakClassLoaderPS.classLoaders).containsKeys(new String[]{"init", "get", "close"});
    }

    @Test
    public void when_processorMetaSupplierCalled_then_contextClassLoaderSet() {
        DAG dag = new DAG();
        dag.newVertex("v", new LeakClassLoaderPMS()).localParallelism(1);
        createHazelcastInstance(smallInstanceWithResourceUploadConfig()).getJet().newJob(dag).join();
        assertClassLoaders(LeakClassLoaderPMS.classLoaders);
        Assertions.assertThat(LeakClassLoaderPMS.classLoaders).containsKeys(new String[]{"init", "get", "close"});
    }

    private void assertClassLoaderForAllMethodsChecked(Class<?> cls, Map<String, List<ClassLoader>> map) {
        for (Method method : cls.getMethods()) {
            if (!Modifier.isStatic(method.getModifiers())) {
                String name = method.getName();
                Assertions.assertThat(map).describedAs("method " + name + " not called", new Object[0]).containsKey(name);
            }
        }
    }

    private void assertClassLoaders(Map<String, List<ClassLoader>> map) {
        for (Map.Entry<String, List<ClassLoader>> entry : map.entrySet()) {
            Iterator<ClassLoader> it = entry.getValue().iterator();
            while (it.hasNext()) {
                ((ObjectAssert) Assertions.assertThat(it.next()).describedAs("expecting JetClassLoader for method " + entry.getKey(), new Object[0])).isInstanceOf(JetClassLoader.class);
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1582552581:
                if (implMethodName.equals("lambda$when_processorCalled_then_contextClassLoaderSet$e9599fe$1")) {
                    z = false;
                    break;
                }
                break;
            case 1617913537:
                if (implMethodName.equals("lambda$when_processorCalled_then_contextClassLoaderSet$e959a1d$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1770510569:
                if (implMethodName.equals("lambda$when_jobCompleted_then_classLoaderShutDown$fb1a34a4$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/deployment/JetClassLoaderTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new SourceP();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/deployment/JetClassLoaderTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new LeakClassLoaderP();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/deployment/JetClassLoaderTest") && serializedLambda.getImplMethodSignature().equals("()Lcom/hazelcast/jet/core/Processor;")) {
                    return () -> {
                        return new TargetP();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
