package com.hazelcast.jet.impl.connector;

import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.SupplierEx;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.core.test.TestOutbox;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.jet.core.test.TestSupport;
import com.hazelcast.jet.impl.execution.WatermarkCoalescer;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.map.impl.proxy.MapProxyImpl;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/connector/StreamEventJournalP_WmCoalescingTest.class */
public class StreamEventJournalP_WmCoalescingTest extends JetTestSupport {
    private static final int JOURNAL_CAPACITY = 10;
    private MapProxyImpl<Integer, Integer> map;
    private int[] partitionKeys;
    private HazelcastInstance instance;

    @Before
    public void setUp() {
        Config smallInstanceConfig = smallInstanceConfig();
        String randomMapName = randomMapName();
        MapConfig mapConfig = new MapConfig();
        mapConfig.setName(randomMapName);
        mapConfig.getEventJournalConfig().setCapacity(10).setEnabled(true);
        smallInstanceConfig.setProperty(ClusterProperty.PARTITION_COUNT.getName(), "2");
        smallInstanceConfig.addMapConfig(mapConfig);
        this.instance = createHazelcastInstance(smallInstanceConfig);
        this.map = this.instance.getMap(randomMapName);
        this.partitionKeys = new int[2];
        int i = 1;
        while (IntStream.of(this.partitionKeys).anyMatch(i2 -> {
            return i2 == 0;
        })) {
            this.partitionKeys[this.instance.getPartitionService().getPartition(Integer.valueOf(i)).getPartitionId()] = i;
            i++;
        }
    }

    @Test
    public void when_entryInEachPartition_then_wmForwarded() {
        this.map.put(Integer.valueOf(this.partitionKeys[0]), 10);
        this.map.put(Integer.valueOf(this.partitionKeys[1]), 10);
        TestSupport.verifyProcessor(createSupplier(Arrays.asList(0, 1), 5000L)).disableProgressAssertion().runUntilOutputMatches(60000L, 100L).disableSnapshots().hazelcastInstance(this.instance).expectOutput(Arrays.asList(wm(10L), 10, 10));
    }

    @Test
    public void when_entryInOnePartition_then_wmForwardedAfterIdleTime() throws InterruptedException {
        this.map.put(Integer.valueOf(this.partitionKeys[0]), 11);
        this.map.put(Integer.valueOf(this.partitionKeys[1]), 11);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Future spawn = spawn(() -> {
            while (!Thread.interrupted()) {
                LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos(500L));
                this.map.put(Integer.valueOf(this.partitionKeys[0]), 11);
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
        TestSupport.verifyProcessor(createSupplier(Arrays.asList(0, 1), 5000L)).disableProgressAssertion().runUntilOutputMatches(60000L, 100L).disableSnapshots().hazelcastInstance(this.instance).outputChecker((list, list2) -> {
            return new HashSet(list).equals(new HashSet(list2));
        }).expectOutput(Arrays.asList(11, wm(11L)));
        spawn.cancel(true);
    }

    @Test
    public void when_allPartitionsIdle_then_idleMessageOutput() {
        TestSupport.verifyProcessor(createSupplier(Arrays.asList(0, 1), 500L)).disableProgressAssertion().runUntilOutputMatches(60000L, 100L).disableSnapshots().hazelcastInstance(this.instance).expectOutput(Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE));
    }

    @Test
    public void when_allPartitionsIdleAndThenRecover_then_wmOutput() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            Util.uncheckRun(() -> {
                countDownLatch.await();
                while (true) {
                    this.map.put(Integer.valueOf(this.partitionKeys[0]), 12);
                    Thread.sleep(100L);
                }
            });
        });
        thread.start();
        Processor processor = (Processor) createSupplier(Arrays.asList(0, 1), 2000L).get();
        TestOutbox testOutbox = new TestOutbox(new int[]{1024});
        Queue queue = testOutbox.queue(0);
        processor.init(testOutbox, new TestProcessorContext().setHazelcastInstance(this.instance));
        assertTrueEventually(() -> {
            processor.complete();
            if (WatermarkCoalescer.IDLE_MESSAGE.equals(queue.peek())) {
                countDownLatch.countDown();
            }
            Assert.assertEquals(Arrays.asList(WatermarkCoalescer.IDLE_MESSAGE, wm(12L), 12), queue.stream().distinct().collect(Collectors.toList()));
        });
        thread.interrupt();
        thread.join();
    }

    @Test
    public void test_nonFirstPartition() {
        this.map.put(Integer.valueOf(this.partitionKeys[1]), 13);
        TestSupport.verifyProcessor(createSupplier(Collections.singletonList(1), 5000L)).disableProgressAssertion().runUntilOutputMatches(60000L, 100L).disableSnapshots().hazelcastInstance(this.instance).expectOutput(Arrays.asList(wm(13L), 13, WatermarkCoalescer.IDLE_MESSAGE));
    }

    public SupplierEx<Processor> createSupplier(List<Integer> list, long j) {
        return () -> {
            return new StreamEventJournalP(this.map, list, eventJournalMapEvent -> {
                return true;
            }, (v0) -> {
                return v0.getNewValue();
            }, JournalInitialPosition.START_FROM_OLDEST, false, EventTimePolicy.eventTimePolicy((v0) -> {
                return v0.intValue();
            }, WatermarkPolicy.limitingLag(0L), 1L, 0L, j));
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1193829727:
                if (implMethodName.equals("lambda$null$a6a3900f$1")) {
                    z = true;
                    break;
                }
                break;
            case -848611033:
                if (implMethodName.equals("getNewValue")) {
                    z = 3;
                    break;
                }
                break;
            case 135889156:
                if (implMethodName.equals("lambda$null$b5447638$1")) {
                    z = 4;
                    break;
                }
                break;
            case 556050114:
                if (implMethodName.equals("intValue")) {
                    z = 2;
                    break;
                }
                break;
            case 2084696756:
                if (implMethodName.equals("lambda$createSupplier$37c408e0$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/StreamEventJournalP_WmCoalescingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;J)Lcom/hazelcast/jet/core/Processor;")) {
                    StreamEventJournalP_WmCoalescingTest streamEventJournalP_WmCoalescingTest = (StreamEventJournalP_WmCoalescingTest) serializedLambda.getCapturedArg(0);
                    List list = (List) serializedLambda.getCapturedArg(1);
                    long longValue = ((Long) serializedLambda.getCapturedArg(2)).longValue();
                    return () -> {
                        return new StreamEventJournalP(this.map, list, eventJournalMapEvent -> {
                            return true;
                        }, (v0) -> {
                            return v0.getNewValue();
                        }, JournalInitialPosition.START_FROM_OLDEST, false, EventTimePolicy.eventTimePolicy((v0) -> {
                            return v0.intValue();
                        }, WatermarkPolicy.limitingLag(0L), 1L, 0L, longValue));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/RunnableEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("runEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/StreamEventJournalP_WmCoalescingTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/CountDownLatch;)V")) {
                    StreamEventJournalP_WmCoalescingTest streamEventJournalP_WmCoalescingTest2 = (StreamEventJournalP_WmCoalescingTest) serializedLambda.getCapturedArg(0);
                    CountDownLatch countDownLatch = (CountDownLatch) serializedLambda.getCapturedArg(1);
                    return () -> {
                        countDownLatch.await();
                        while (true) {
                            this.map.put(Integer.valueOf(this.partitionKeys[0]), 12);
                            Thread.sleep(100L);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Integer") && serializedLambda.getImplMethodSignature().equals("()I")) {
                    return (v0) -> {
                        return v0.intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/map/EventJournalMapEvent") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getNewValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/PredicateEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("testEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/connector/StreamEventJournalP_WmCoalescingTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/map/EventJournalMapEvent;)Z")) {
                    return eventJournalMapEvent -> {
                        return true;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
