package com.hazelcast.jet.core;

import com.hazelcast.function.SupplierEx;
import com.hazelcast.function.ToLongFunctionEx;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.impl.execution.WatermarkCoalescer;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/EventTimeMapperTest.class */
public class EventTimeMapperTest {
    private static final long LAG = 3;

    @Rule
    public ExpectedException exception = ExpectedException.none();

    @Test
    public void smokeTest() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.longValue();
        }, WatermarkPolicy.limitingLag(LAG), 1L, 0L, 5L));
        eventTimeMapper.addPartitions(0L, 2);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(1L), (Object) null, 0, Long.MIN_VALUE), new Object[0]);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(5L), (Object) null, 0, Long.MIN_VALUE), WatermarkCoalescer.IDLE_MESSAGE);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(5L), (Object) null, 0, Long.MIN_VALUE), new Object[0]);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(5L), 100L, 0, Long.MIN_VALUE), JetTestSupport.wm(97L), 100L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(5L), 90L, 0, Long.MIN_VALUE), 90L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(5L), 101L, 0, Long.MIN_VALUE), JetTestSupport.wm(98L), 101L);
    }

    @Test
    public void smokeTest_disabledIdleTimeout() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.longValue();
        }, WatermarkPolicy.limitingLag(LAG), 1L, 0L, 0L));
        eventTimeMapper.addPartitions(2);
        assertTraverser(eventTimeMapper.flatMapIdle(), new Object[0]);
        assertTraverser(eventTimeMapper.flatMapEvent(10L, 0, Long.MIN_VALUE), 10L);
        assertTraverser(eventTimeMapper.flatMapEvent(11L, 0, Long.MIN_VALUE), 11L);
        assertTraverser(eventTimeMapper.flatMapEvent(10L, 1, Long.MIN_VALUE), JetTestSupport.wm(7L), 10L);
        assertTraverser(eventTimeMapper.flatMapEvent(11L, 1, Long.MIN_VALUE), JetTestSupport.wm(8L), 11L);
        assertTraverser(eventTimeMapper.flatMapEvent(12L, 1, Long.MIN_VALUE), 12L);
        assertTraverser(eventTimeMapper.flatMapEvent(13L, 0, Long.MIN_VALUE), JetTestSupport.wm(9L), 13L);
    }

    @Test
    public void test_zeroPartitions() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.longValue();
        }, WatermarkPolicy.limitingLag(LAG), 1L, 0L, 0L));
        assertTraverser(eventTimeMapper.flatMapIdle(), WatermarkCoalescer.IDLE_MESSAGE);
        assertTraverser(eventTimeMapper.flatMapIdle(), new Object[0]);
        eventTimeMapper.addPartitions(1);
        assertTraverser(eventTimeMapper.flatMapIdle(), new Object[0]);
        assertTraverser(eventTimeMapper.flatMapEvent(10L, 0, Long.MIN_VALUE), JetTestSupport.wm(7L), 10L);
    }

    @Test
    public void when_idle_event_idle_then_twoIdleMessagesSent() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.longValue();
        }, WatermarkPolicy.limitingLag(LAG), 1L, 0L, 10L));
        eventTimeMapper.addPartitions(1);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(0L), 10L, 0, Long.MIN_VALUE), JetTestSupport.wm(7L), 10L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(10L), (Object) null, 0, Long.MIN_VALUE), WatermarkCoalescer.IDLE_MESSAGE);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(10L), 10L, 0, Long.MIN_VALUE), 10L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(10L), (Object) null, 0, Long.MIN_VALUE), new Object[0]);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(20L), (Object) null, 0, Long.MIN_VALUE), WatermarkCoalescer.IDLE_MESSAGE);
    }

    @Test
    public void when_eventInOneOfTwoPartitions_then_wmAndIdleMessageForwardedAfterTimeout() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.longValue();
        }, WatermarkPolicy.limitingLag(LAG), 1L, 0L, 10L));
        eventTimeMapper.addPartitions(ns(0L), 2);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(0L), 10L, 0, Long.MIN_VALUE), 10L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(10L), (Object) null, 0, Long.MIN_VALUE), JetTestSupport.wm(7L), WatermarkCoalescer.IDLE_MESSAGE);
    }

    @Test
    public void when_noTimestampFnAndNoNativeTime_then_throw() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((ToLongFunctionEx) null, WatermarkPolicy.limitingLag(LAG), 1L, 0L, 10L));
        eventTimeMapper.addPartitions(ns(0L), 1);
        this.exception.expectMessage("Neither timestampFn nor nativeEventTime specified");
        eventTimeMapper.flatMapEvent(ns(0L), 10L, 0, Long.MIN_VALUE);
    }

    @Test
    public void when_noTimestampFn_then_useNativeTime() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((ToLongFunctionEx) null, WatermarkPolicy.limitingLag(LAG), 1L, 0L, 5L));
        eventTimeMapper.addPartitions(0L, 1);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(1L), 10L, 0, 11L), JetTestSupport.wm(8L), 10L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(1L), 11L, 0, 12L), JetTestSupport.wm(9L), 11L);
    }

    @Test
    public void when_throttlingToMaxFrame_then_noWatermarksOutput() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.longValue();
        }, WatermarkPolicy.limitingLag(LAG), 0L, 0L, 5L));
        eventTimeMapper.addPartitions(0L, 1);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(1L), -10L, 0, 11L), -10L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(1L), 10L, 0, 12L), 10L);
    }

    @Test
    public void when_restoredState_then_wmDoesNotGoBack() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.longValue();
        }, WatermarkPolicy.limitingLag(0L), 1L, 0L, 5L));
        eventTimeMapper.addPartitions(0L, 1);
        eventTimeMapper.restoreWatermark(0, 10L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(0L), 9L, 0, Long.MIN_VALUE), 9L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(0L), 10L, 0, Long.MIN_VALUE), 10L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(0L), 11L, 0, Long.MIN_VALUE), JetTestSupport.wm(11L), 11L);
    }

    @Test
    public void when_twoActiveQueues_theLaggingOneRemoved_then_wmForwarded() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.longValue();
        }, WatermarkPolicy.limitingLag(0L), 1L, 0L, 5L));
        eventTimeMapper.addPartitions(0L, 2);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(0L), 10L, 0, Long.MIN_VALUE), 10L);
        assertTraverser(eventTimeMapper.removePartition(ns(0L), 1), JetTestSupport.wm(10L));
    }

    @Test
    public void when_twoActiveQueues_theAheadOneRemoved_then_noWmForwarded() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.longValue();
        }, WatermarkPolicy.limitingLag(0L), 1L, 0L, 5L));
        eventTimeMapper.addPartitions(0L, 2);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(0L), 10L, 0, Long.MIN_VALUE), 10L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(0L), 11L, 1, Long.MIN_VALUE), JetTestSupport.wm(10L), 11L);
        assertTraverser(eventTimeMapper.removePartition(ns(0L), 1), new Object[0]);
    }

    @Test
    public void when_threePartitions_laggingOneRemoved_secondLaggingOneIdle_then_noWmForwarded() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.longValue();
        }, WatermarkPolicy.limitingLag(0L), 1L, 0L, 5L));
        eventTimeMapper.addPartitions(0L, 3);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(0L), 10L, 0, Long.MIN_VALUE), 10L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(1L), 11L, 1, Long.MIN_VALUE), 11L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(1L), 12L, 2, Long.MIN_VALUE), JetTestSupport.wm(10L), 12L);
        assertTraverser(eventTimeMapper.removePartition(ns(5L), 1), JetTestSupport.wm(12L));
    }

    @Test
    public void when_currentWmBeyondReportedEventTimestamp_then_eventNotLate() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.longValue();
        }, constantWmPolicy(42L), 1L, 0L, 5L));
        eventTimeMapper.addPartitions(0L, 1);
        assertTraverser(eventTimeMapper.flatMapEvent(41L, 0, Long.MIN_VALUE), JetTestSupport.wm(41L), 41L);
    }

    @Test
    public void when_currentWmBeyondReportedEventTimestamp_and_eventLate_then_wmDoesNotGoBack() {
        EventTimeMapper eventTimeMapper = new EventTimeMapper(EventTimePolicy.eventTimePolicy((v0) -> {
            return v0.longValue();
        }, constantWmPolicy(42L), 1L, 0L, 5L));
        eventTimeMapper.addPartitions(0L, 1);
        assertTraverser(eventTimeMapper.flatMapEvent(41L, 0, Long.MIN_VALUE), JetTestSupport.wm(41L), 41L);
        assertTraverser(eventTimeMapper.flatMapEvent(ns(0L), 40L, 0, Long.MIN_VALUE), 40L);
    }

    private static SupplierEx<WatermarkPolicy> constantWmPolicy(long j) {
        return () -> {
            return new WatermarkPolicy() { // from class: com.hazelcast.jet.core.EventTimeMapperTest.1
                public void reportEvent(long j2) {
                }

                public long getCurrentWatermark() {
                    return j;
                }
            };
        };
    }

    private <T> void assertTraverser(Traverser<T> traverser, T... tArr) {
        for (T t : tArr) {
            Assert.assertEquals(t, traverser.next());
        }
        Assert.assertNull(traverser.next());
    }

    private long ns(long j) {
        return TimeUnit.MILLISECONDS.toNanos(j);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 119244885:
                if (implMethodName.equals("longValue")) {
                    z = true;
                    break;
                }
                break;
            case 712293192:
                if (implMethodName.equals("lambda$constantWmPolicy$c57ec36f$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/EventTimeMapperTest") && serializedLambda.getImplMethodSignature().equals("(J)Lcom/hazelcast/jet/core/WatermarkPolicy;")) {
                    long longValue = ((Long) serializedLambda.getCapturedArg(0)).longValue();
                    return () -> {
                        return new WatermarkPolicy() { // from class: com.hazelcast.jet.core.EventTimeMapperTest.1
                            public void reportEvent(long j2) {
                            }

                            public long getCurrentWatermark() {
                                return longValue;
                            }
                        };
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.longValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.longValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.longValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.longValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.longValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.longValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.longValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.longValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.longValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.longValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.longValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("java/lang/Long") && serializedLambda.getImplMethodSignature().equals("()J")) {
                    return (v0) -> {
                        return v0.longValue();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
