package com.hazelcast.jet.pipeline;

import com.hazelcast.collection.IList;
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.internal.util.UuidUtil;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.datamodel.WindowResult;
import com.hazelcast.jet.impl.JobRepository;
import com.hazelcast.jet.pipeline.SourceBuilder;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.QuickTest;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class})
/* loaded from: input_file:com/hazelcast/jet/pipeline/SourceBuilder_TopologyChangeTest.class */
public class SourceBuilder_TopologyChangeTest extends JetTestSupport {
    private static volatile boolean stateRestored;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/pipeline/SourceBuilder_TopologyChangeTest$NumberGeneratorContext.class */
    public static final class NumberGeneratorContext implements Serializable {
        long startTime;
        int current;

        private NumberGeneratorContext() {
            this.startTime = System.nanoTime();
        }

        void restore(NumberGeneratorContext numberGeneratorContext) {
            this.startTime = numberGeneratorContext.startTime;
            this.current = numberGeneratorContext.current;
        }
    }

    @Test
    public void test_restartJob_nodeShutDown() {
        testTopologyChange(() -> {
            return createHazelcastInstance();
        }, hazelcastInstance -> {
            hazelcastInstance.shutdown();
        }, true);
    }

    @Test
    public void test_restartJob_nodeTerminated() {
        testTopologyChange(() -> {
            return createHazelcastInstance();
        }, hazelcastInstance -> {
            hazelcastInstance.getLifecycleService().terminate();
        }, false);
    }

    @Test
    public void test_restartJob_nodeAdded() {
        testTopologyChange(() -> {
            return null;
        }, hazelcastInstance -> {
            createHazelcastInstance();
        }, true);
    }

    private void testTopologyChange(Supplier<HazelcastInstance> supplier, Consumer<HazelcastInstance> consumer, boolean z) {
        stateRestored = false;
        StreamSource build = ((SourceBuilder.TimestampedStream) SourceBuilder.timestampedStream("src", context -> {
            return new NumberGeneratorContext();
        }).fillBufferFn((numberGeneratorContext, timestampedSourceBuffer) -> {
            long min = Math.min(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - numberGeneratorContext.startTime), numberGeneratorContext.current + 100);
            while (numberGeneratorContext.current < min) {
                timestampedSourceBuffer.add(Integer.valueOf(numberGeneratorContext.current), numberGeneratorContext.current);
                numberGeneratorContext.current++;
            }
        }).createSnapshotFn(numberGeneratorContext2 -> {
            System.out.println("Will save " + numberGeneratorContext2.current + " to snapshot");
            return numberGeneratorContext2;
        }).restoreSnapshotFn((numberGeneratorContext3, list) -> {
            if (!$assertionsDisabled && list.size() != 1) {
                throw new AssertionError();
            }
            numberGeneratorContext3.restore((NumberGeneratorContext) list.get(0));
            System.out.println("Restored " + numberGeneratorContext3.current + " from snapshot");
            stateRestored = true;
        })).build();
        Config smallInstanceConfig = smallInstanceConfig();
        smallInstanceConfig.getJetConfig().setScaleUpDelayMillis(1000L);
        HazelcastInstance createHazelcastInstance = createHazelcastInstance(smallInstanceConfig);
        HazelcastInstance hazelcastInstance = supplier.get();
        IList list2 = createHazelcastInstance.getList("result-" + UuidUtil.newUnsecureUuidString());
        Pipeline create = Pipeline.create();
        create.readFrom(build).withNativeTimestamps(0L).window(WindowDefinition.tumbling(100L)).aggregate(AggregateOperations.counting()).peek().writeTo(Sinks.list(list2));
        Job newJob = createHazelcastInstance.getJet().newJob(create, new JobConfig().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE).setSnapshotIntervalMillis(500L));
        assertJobVisible(createHazelcastInstance, newJob, "test job");
        assertTrueEventually(() -> {
            Assert.assertFalse("result list is still empty", list2.isEmpty());
        });
        assertJobStatusEventually(newJob, JobStatus.RUNNING);
        waitForFirstSnapshot(new JobRepository(createHazelcastInstance), newJob.getId(), 10, false);
        Assert.assertFalse(stateRestored);
        int size = createHazelcastInstance.getCluster().getMembers().size();
        consumer.accept(hazelcastInstance);
        assertTrueEventually("cluster size should have changed", () -> {
            Assert.assertTrue(createHazelcastInstance.getCluster().getMembers().size() != size);
        });
        assertTrueEventually(() -> {
            Assert.assertTrue("restoreSnapshotFn was not called", stateRestored);
        });
        int size2 = list2.size();
        assertTrueEventually(() -> {
            Assert.assertTrue("no more results added to the list", list2.size() > size2);
        });
        cancelAndJoin(newJob);
        Iterator it = list2.iterator();
        for (int i = 0; i < list2.size(); i++) {
            WindowResult windowResult = (WindowResult) it.next();
            Assert.assertEquals(100L, ((Long) windowResult.result()).longValue());
            if (z) {
                Assert.assertEquals(i * 100, windowResult.start());
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -34074924:
                if (implMethodName.equals("lambda$testTopologyChange$f51ac2be$1")) {
                    z = false;
                    break;
                }
                break;
            case -34074923:
                if (implMethodName.equals("lambda$testTopologyChange$f51ac2be$2")) {
                    z = true;
                    break;
                }
                break;
            case 1679042330:
                if (implMethodName.equals("lambda$testTopologyChange$60a581c3$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1679042331:
                if (implMethodName.equals("lambda$testTopologyChange$60a581c3$2")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/SourceBuilder_TopologyChangeTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/core/Processor$Context;)Lcom/hazelcast/jet/pipeline/SourceBuilder_TopologyChangeTest$NumberGeneratorContext;")) {
                    return context -> {
                        return new NumberGeneratorContext();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/SourceBuilder_TopologyChangeTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/pipeline/SourceBuilder_TopologyChangeTest$NumberGeneratorContext;)Lcom/hazelcast/jet/pipeline/SourceBuilder_TopologyChangeTest$NumberGeneratorContext;")) {
                    return numberGeneratorContext2 -> {
                        System.out.println("Will save " + numberGeneratorContext2.current + " to snapshot");
                        return numberGeneratorContext2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/SourceBuilder_TopologyChangeTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/pipeline/SourceBuilder_TopologyChangeTest$NumberGeneratorContext;Lcom/hazelcast/jet/pipeline/SourceBuilder$TimestampedSourceBuffer;)V")) {
                    return (numberGeneratorContext, timestampedSourceBuffer) -> {
                        long min = Math.min(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - numberGeneratorContext.startTime), numberGeneratorContext.current + 100);
                        while (numberGeneratorContext.current < min) {
                            timestampedSourceBuffer.add(Integer.valueOf(numberGeneratorContext.current), numberGeneratorContext.current);
                            numberGeneratorContext.current++;
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/SourceBuilder_TopologyChangeTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/pipeline/SourceBuilder_TopologyChangeTest$NumberGeneratorContext;Ljava/util/List;)V")) {
                    return (numberGeneratorContext3, list) -> {
                        if (!$assertionsDisabled && list.size() != 1) {
                            throw new AssertionError();
                        }
                        numberGeneratorContext3.restore((NumberGeneratorContext) list.get(0));
                        System.out.println("Restored " + numberGeneratorContext3.current + " from snapshot");
                        stateRestored = true;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !SourceBuilder_TopologyChangeTest.class.desiredAssertionStatus();
        stateRestored = false;
    }
}
