package com.hazelcast.jet.impl.execution;

import com.hazelcast.config.Config;
import com.hazelcast.config.EventJournalConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.SinkBuilder;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.jet.pipeline.WindowDefinition;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/execution/WatermarkCoalescer_TerminalSnapshotTest.class */
public class WatermarkCoalescer_TerminalSnapshotTest extends JetTestSupport {
    private static final int PARTITION_COUNT = 2;
    private static final int COUNT = 10;
    private HazelcastInstance instance;
    private IMap<String, Integer> sourceMap;

    @Before
    public void setUp() {
        Config smallInstanceConfig = smallInstanceConfig();
        smallInstanceConfig.getMapConfig("*").setEventJournalConfig(new EventJournalConfig().setCapacity(1000000).setEnabled(true));
        smallInstanceConfig.setProperty(ClusterProperty.PARTITION_COUNT.getName(), String.valueOf(2));
        this.instance = createHazelcastInstance(smallInstanceConfig);
        this.sourceMap = this.instance.getMap("test");
    }

    @Test
    public void test() throws Exception {
        String generateKeyForPartition = generateKeyForPartition(this.instance, 0);
        String generateKeyForPartition2 = generateKeyForPartition(this.instance, 1);
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.mapJournal(this.sourceMap, JournalInitialPosition.START_FROM_OLDEST)).withTimestamps((v0) -> {
            return v0.getValue();
        }, 0L).setLocalParallelism(2).groupingKey((v0) -> {
            return v0.getKey();
        }).window(WindowDefinition.sliding(1L, 1L)).aggregate(AggregateOperations.counting()).setLocalParallelism(2).writeTo(SinkBuilder.sinkBuilder("throwing", context -> {
            return "";
        }).receiveFn((str, keyedWindowResult) -> {
            if (((Long) keyedWindowResult.result()).longValue() != 10) {
                throw new RuntimeException("Received unexpected item " + keyedWindowResult + ", expected count is 10");
            }
        }).build());
        Job newJob = this.instance.getJet().newJob(create, new JobConfig().setProcessingGuarantee(ProcessingGuarantee.AT_LEAST_ONCE));
        ArrayList<Future> arrayList = new ArrayList();
        arrayList.add(spawn(() -> {
            while (true) {
                assertJobStatusEventually(newJob, JobStatus.RUNNING);
                System.out.println("============RESTARTING JOB=========");
                newJob.restart();
                Thread.sleep(2000L);
            }
        }));
        arrayList.add(spawn(() -> {
            producer(generateKeyForPartition, 1);
        }));
        arrayList.add(spawn(() -> {
            producer(generateKeyForPartition2, 2);
        }));
        sleepSeconds(20);
        for (Future future : arrayList) {
            future.cancel(true);
            try {
                future.get();
                Assert.fail("Exception was expected");
            } catch (CancellationException e) {
            }
        }
        JobStatus status = newJob.getStatus();
        Assert.assertTrue("job should not be completed, status=" + status, (status == JobStatus.FAILED || status == JobStatus.COMPLETED || status == JobStatus.SUSPENDED) ? false : true);
    }

    private void producer(String str, int i) {
        int i2 = 0;
        while (true) {
            for (int i3 = 0; i3 < 10; i3++) {
                this.sourceMap.set(str, Integer.valueOf(i2));
            }
            LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(i));
            i2++;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1249358039:
                if (implMethodName.equals("getKey")) {
                    z = true;
                    break;
                }
                break;
            case -228223174:
                if (implMethodName.equals("lambda$test$a441ef18$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1112269767:
                if (implMethodName.equals("lambda$test$c8cc76b3$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1967798203:
                if (implMethodName.equals("getValue")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/util/Map$Entry") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/Map$Entry") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/execution/WatermarkCoalescer_TerminalSnapshotTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lcom/hazelcast/jet/datamodel/KeyedWindowResult;)V")) {
                    return (str, keyedWindowResult) -> {
                        if (((Long) keyedWindowResult.result()).longValue() != 10) {
                            throw new RuntimeException("Received unexpected item " + keyedWindowResult + ", expected count is 10");
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/execution/WatermarkCoalescer_TerminalSnapshotTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/core/Processor$Context;)Ljava/lang/String;")) {
                    return context -> {
                        return "";
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
