package com.hazelcast.jet.core;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.Observable;
import com.hazelcast.jet.function.Observer;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.StreamStage;
import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/ObservableShutdownTest.class */
public class ObservableShutdownTest extends JetTestSupport {
    private static final int MEMBER_COUNT = 3;
    private HazelcastInstance[] members;
    private HazelcastInstance client;
    private Observable<Long> memberObservable;
    private Observable<Long> clientObservable;
    private TestObserver memberObserver;
    private TestObserver clientObserver;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/core/ObservableShutdownTest$TestObserver.class */
    public static final class TestObserver implements Observer<Long> {
        private final AtomicInteger values;

        private TestObserver() {
            this.values = new AtomicInteger();
        }

        public void onNext(@Nonnull Long l) {
            this.values.incrementAndGet();
        }

        public void onError(@Nonnull Throwable th) {
            Assert.fail("Errors aren't expected: " + th.getMessage());
        }

        public void onComplete() {
            Assert.fail("Completions aren't expected");
        }

        int getNoOfValues() {
            return this.values.get();
        }
    }

    @Before
    public void before() {
        this.members = createHazelcastInstances(3);
        this.client = createHazelcastClient();
        this.memberObserver = new TestObserver();
        this.memberObservable = this.members[this.members.length - 1].getJet().newObservable();
        this.memberObservable.addObserver(this.memberObserver);
        this.clientObserver = new TestObserver();
        this.clientObservable = this.client.getJet().newObservable();
        this.clientObservable.addObserver(this.clientObserver);
    }

    @Test
    public void when_jetInstanceIsShutDown_then_ObservablesStopReceivingEvents() {
        Pipeline create = Pipeline.create();
        StreamStage map = create.readFrom(TestSources.itemStream(100)).withoutTimestamps().map((v0) -> {
            return v0.sequence();
        });
        map.writeTo(Sinks.observable(this.clientObservable));
        map.writeTo(Sinks.observable(this.memberObservable));
        Job newJob = this.client.getJet().newJob(create);
        assertTrueEventually(() -> {
            Assert.assertTrue(this.clientObserver.getNoOfValues() > 10);
        });
        assertTrueEventually(() -> {
            Assert.assertTrue(this.memberObserver.getNoOfValues() > 10);
        });
        this.client.shutdown();
        assertObserverStopsReceivingValues(this.clientObserver);
        long id = newJob.getId();
        this.members[this.members.length - 1].shutdown();
        assertJobStatusEventually(this.members[0].getJet().getJob(id), JobStatus.RUNNING);
        assertObserverStopsReceivingValues(this.memberObserver);
    }

    private void assertObserverStopsReceivingValues(TestObserver testObserver) {
        assertTrueEventually(() -> {
            int noOfValues = testObserver.getNoOfValues();
            TimeUnit.MILLISECONDS.sleep(1000L);
            Assert.assertEquals(noOfValues, testObserver.getNoOfValues());
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1349547969:
                if (implMethodName.equals("sequence")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/test/SimpleEvent") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.sequence();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
