package com.hazelcast.jet.impl.execution;

import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/execution/WatermarkCoalescerTest.class */
public class WatermarkCoalescerTest {

    @Rule
    public ExpectedException exception = ExpectedException.none();
    private WatermarkCoalescer wc = WatermarkCoalescer.create(2);

    @Test
    public void when_nothingHappened_then_noWm() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.checkWmHistory());
    }

    @Test
    public void when_bothInputsHaveWm_then_forwarded() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, 1L));
        Assert.assertEquals(1L, this.wc.topObservedWm());
        Assert.assertEquals(Long.MIN_VALUE, this.wc.coalescedWm());
        Assert.assertEquals(1L, this.wc.observeWm(1, 2L));
        Assert.assertEquals(2L, this.wc.observeWm(0, 3L));
        Assert.assertEquals(3L, this.wc.observeWm(1, 4L));
        Assert.assertEquals(4L, this.wc.topObservedWm());
        Assert.assertEquals(3L, this.wc.coalescedWm());
    }

    @Test
    public void when_i1RecoversFromIdleByEvent_then_wmFromI1Coalesced() {
        when_i1RecoversFromIdle_then_wmFromI1Coalesced("event");
    }

    @Test
    public void when_i1RecoversFromIdleByWatermark_then_wmFromI1Coalesced() {
        when_i1RecoversFromIdle_then_wmFromI1Coalesced("wm");
    }

    private void when_i1RecoversFromIdle_then_wmFromI1Coalesced(String str) {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(11L, this.wc.observeWm(1, 11L));
        if (str.equals("wm")) {
            Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, 11L));
        } else {
            this.wc.observeEvent(0);
        }
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(1, 12L));
        Assert.assertEquals(12L, this.wc.observeWm(0, 13L));
        Assert.assertEquals(Long.MIN_VALUE, this.wc.checkWmHistory());
    }

    @Test
    public void when_i1Idle_i2HasWm_then_forwardedImmediately() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(100L, this.wc.observeWm(1, 100L));
    }

    @Test
    public void when_i1HasWm_i2Idle_then_forwardedImmediately() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, 100L));
        Assert.assertEquals(100L, this.wc.observeWm(1, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
    }

    @Test
    public void when_i1_active_i2_active_then_wmForwardedImmediately() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, 100L));
        Assert.assertEquals(100L, this.wc.observeWm(1, 101L));
        Assert.assertEquals(101L, this.wc.observeWm(0, 101L));
    }

    @Test
    public void when_i1_active_i2_idle_then_wmForwardedImmediately() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, 100L));
        Assert.assertEquals(100L, this.wc.observeWm(1, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
    }

    @Test
    public void when_i1_idle_i2_active_then_wmForwardedImmediately() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(100L, this.wc.observeWm(1, 100L));
    }

    @Test
    public void when_i1_activeNoWm_i2_idle_then_noWmToForward() {
        this.wc.observeEvent(0);
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(1, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(Long.MIN_VALUE, this.wc.checkWmHistory());
        Assert.assertEquals(Long.MIN_VALUE, this.wc.checkWmHistory());
    }

    @Test
    public void when_i1_idle_i2_activeNoWm_then_wmForwardedAfterADelay() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        this.wc.observeEvent(1);
        Assert.assertEquals(Long.MIN_VALUE, this.wc.checkWmHistory());
        Assert.assertEquals(Long.MIN_VALUE, this.wc.checkWmHistory());
    }

    @Test
    public void when_i1_idle_i2_idle_then_idleMessageForwardedImmediately() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(WatermarkCoalescer.IDLE_MESSAGE.timestamp(), this.wc.observeWm(1, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
    }

    @Test
    public void when_i1_active_i2_done_then_forwardImmediately() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, 100L));
        Assert.assertEquals(100L, this.wc.queueDone(1));
    }

    @Test
    public void when_i1_done_i2_active_then_forwardImmediately() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.queueDone(0));
        Assert.assertEquals(100L, this.wc.observeWm(1, 100L));
    }

    @Test
    public void when_i1_idle_i2_done_i1_recovers_then_idleMessageForwardedImmediately() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(WatermarkCoalescer.IDLE_MESSAGE.timestamp(), this.wc.queueDone(1));
        Assert.assertEquals(10L, this.wc.observeWm(0, 10L));
    }

    @Test
    public void when_i1_done_i2_idleAndRecovers_then_wmsForwardedImmediately() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.queueDone(0));
        Assert.assertEquals(WatermarkCoalescer.IDLE_MESSAGE.timestamp(), this.wc.observeWm(1, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(10L, this.wc.observeWm(1, 10L));
    }

    @Test
    public void when_duplicateIdleMessage_then_processed() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
    }

    @Test
    public void when_allIdleAndDuplicateIdleMessage_then_processed() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(WatermarkCoalescer.IDLE_MESSAGE.timestamp(), this.wc.observeWm(1, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
    }

    @Test
    public void when_allDone_then_noMaxValueEmitted() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.queueDone(0));
        Assert.assertEquals(Long.MIN_VALUE, this.wc.queueDone(1));
    }

    @Test
    public void when_twoInputsIdle_then_singleIdleMessage() {
        this.wc = WatermarkCoalescer.create(3);
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        this.wc.observeWm(1, WatermarkCoalescer.IDLE_MESSAGE.timestamp());
    }

    @Test
    public void when_duplicateDoneCall_then_error() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.queueDone(0));
        this.exception.expectMessage("Duplicate");
        Assert.assertEquals(Long.MIN_VALUE, this.wc.queueDone(0));
    }

    @Test
    public void when_wmAfterDone_then_error() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.queueDone(0));
        this.exception.expectMessage("not monotonically increasing");
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, 0L));
    }

    @Test
    public void when_idleMessageAfterDone_then_error() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.queueDone(0));
        this.exception.expectMessage("not monotonically increasing");
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
    }

    @Test
    public void when_wmGoesBack_then_error() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, 10L));
        this.exception.expectMessage("not monotonically increasing");
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, 9L));
    }

    @Test
    public void when_allInputsHadWms_allBecomeIdle_theLessAheadBecomesIdleLater_then_topWmForwarded() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, 10L));
        Assert.assertEquals(10L, this.wc.observeWm(1, 11L));
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(1, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(11L, this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(WatermarkCoalescer.IDLE_MESSAGE.timestamp(), this.wc.checkWmHistory());
    }

    @Test
    public void when_allInputsHadWms_aheadOnesBecomeIdle_behindOneIsDone_then_topWmForwarded() {
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(0, 10L));
        Assert.assertEquals(10L, this.wc.observeWm(1, 11L));
        Assert.assertEquals(Long.MIN_VALUE, this.wc.observeWm(1, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(11L, this.wc.queueDone(0));
        Assert.assertEquals(WatermarkCoalescer.IDLE_MESSAGE.timestamp(), this.wc.checkWmHistory());
    }

    @Test
    public void test_singleInput() {
        this.wc = WatermarkCoalescer.create(1);
        Assert.assertEquals(WatermarkCoalescer.IDLE_MESSAGE.timestamp(), this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(10L, this.wc.observeWm(0, 10L));
        Assert.assertEquals(11L, this.wc.observeWm(0, 11L));
        Assert.assertEquals(WatermarkCoalescer.IDLE_MESSAGE.timestamp(), this.wc.observeWm(0, WatermarkCoalescer.IDLE_MESSAGE.timestamp()));
        Assert.assertEquals(12L, this.wc.observeWm(0, 12L));
        Assert.assertEquals(Long.MIN_VALUE, this.wc.queueDone(0));
    }
}
