package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.serialization.impl.DefaultSerializationServiceBuilder;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.TestUtil;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.logging.ILogger;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nonnull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.mockito.Mockito;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/execution/ProcessorTaskletTest_Watermarks.class */
public class ProcessorTaskletTest_Watermarks {
    private static final int CALL_COUNT_LIMIT = 10;
    private List<MockInboundStream> instreams;
    private List<OutboundEdgeStream> outstreams;
    private ProcessorWithWatermarks processor;
    private Processor.Context context;
    private MockOutboundCollector snapshotCollector;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/ProcessorTaskletTest_Watermarks$ProcessorWithWatermarks.class */
    public static class ProcessorWithWatermarks implements Processor {
        int nullaryProcessCallCountdown;
        int processWatermarkCallCountdown;
        private Outbox outbox;

        private ProcessorWithWatermarks() {
        }

        public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
            this.outbox = outbox;
        }

        public void process(int i, @Nonnull Inbox inbox) {
            while (true) {
                Object peek = inbox.peek();
                if (peek == null) {
                    return;
                }
                if (this.outbox.offer(peek)) {
                    inbox.remove();
                }
            }
        }

        public boolean complete() {
            return true;
        }

        public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
            if (this.processWatermarkCallCountdown >= 0) {
                Assert.assertTrue(this.outbox.offer("wm(" + watermark.timestamp() + ")-" + this.processWatermarkCallCountdown));
                if (this.processWatermarkCallCountdown > 0) {
                    this.processWatermarkCallCountdown--;
                    return false;
                }
            }
            Assert.assertTrue(this.outbox.offer(watermark));
            return true;
        }

        public boolean tryProcess() {
            int i = this.nullaryProcessCallCountdown;
            this.nullaryProcessCallCountdown = i - 1;
            return i <= 0;
        }
    }

    @Before
    public void setUp() {
        this.processor = new ProcessorWithWatermarks();
        this.context = new TestProcessorContext();
        this.instreams = new ArrayList();
        this.outstreams = new ArrayList();
        this.snapshotCollector = new MockOutboundCollector(0);
    }

    @Test
    public void when_isCooperative_then_true() {
        Assert.assertTrue(createTasklet().isCooperative());
    }

    @Test
    public void when_singleInbound_then_watermarkForwardedImmediately() {
        ArrayList arrayList = new ArrayList(Arrays.asList(0, 1));
        arrayList.add(JetTestSupport.wm(123L));
        MockInboundStream mockInboundStream = new MockInboundStream(0, arrayList, arrayList.size());
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        callUntil(createTasklet());
        Assert.assertEquals(Arrays.asList(0, 1, "wm(123)-0", JetTestSupport.wm(123L)), mockOutboundStream.getBuffer());
    }

    @Test
    public void when_multipleInboundAndUnlimitedRetention_then_waitForWm() {
        List asList = Arrays.asList(0, 1, JetTestSupport.wm(100L), 2, 3);
        ArrayList arrayList = new ArrayList();
        MockInboundStream mockInboundStream = new MockInboundStream(0, asList, 1024);
        MockInboundStream mockInboundStream2 = new MockInboundStream(0, arrayList, 1024);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0);
        this.instreams.add(mockInboundStream);
        this.instreams.add(mockInboundStream2);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet();
        callUntil(createTasklet);
        Assert.assertEquals(Arrays.asList(0, 1, 2, 3), mockOutboundStream.getBuffer());
        mockOutboundStream.flush();
        callUntil(createTasklet);
        Assert.assertEquals(Collections.emptyList(), mockOutboundStream.getBuffer());
        mockInboundStream2.push(JetTestSupport.wm(99L));
        callUntil(createTasklet);
        Assert.assertEquals(Arrays.asList("wm(99)-0", JetTestSupport.wm(99L)), mockOutboundStream.getBuffer());
    }

    @Test
    public void when_processWatermarkReturnsFalse_then_calledAgain() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.singletonList(JetTestSupport.wm(100L)), 1000);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 128);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet();
        this.processor.processWatermarkCallCountdown = 2;
        callUntil(createTasklet);
        Assert.assertEquals(Arrays.asList("wm(100)-2", "wm(100)-1", "wm(100)-0", JetTestSupport.wm(100L)), mockOutboundStream.getBuffer());
    }

    @Test
    public void when_multipleWms_then_processed() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Arrays.asList(JetTestSupport.wm(100L), JetTestSupport.wm(101L)), 1000);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 128);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        callUntil(createTasklet());
        Assert.assertEquals(Arrays.asList("wm(100)-0", JetTestSupport.wm(100L), "wm(101)-0", JetTestSupport.wm(101L)), mockOutboundStream.getBuffer());
    }

    @Test
    public void when_allEdgesIdle_then_idleForwarded() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE), 1000);
        MockInboundStream mockInboundStream2 = new MockInboundStream(0, Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE), 1000);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 128);
        this.instreams.add(mockInboundStream);
        this.instreams.add(mockInboundStream2);
        this.outstreams.add(mockOutboundStream);
        callUntil(createTasklet());
        Assert.assertEquals(Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE), mockOutboundStream.getBuffer());
    }

    @Test
    public void when_allEdgesIdleAndThenRecover_then_usedInCoalescing() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE), 1000);
        MockInboundStream mockInboundStream2 = new MockInboundStream(0, Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE), 1000);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 128);
        this.instreams.add(mockInboundStream);
        this.instreams.add(mockInboundStream2);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet();
        callUntil(createTasklet);
        Assert.assertEquals(Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE), mockOutboundStream.getBuffer());
        mockOutboundStream.getBuffer().clear();
        mockInboundStream.push(JetTestSupport.wm(100L));
        mockInboundStream2.push(JetTestSupport.wm(101L));
        callUntil(createTasklet);
        Assert.assertEquals(Arrays.asList("wm(100)-0", JetTestSupport.wm(100L)), mockOutboundStream.getBuffer());
    }

    @Test
    public void when_oneEdgeIdle_then_excludedFromCoalescing() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.singletonList(JetTestSupport.wm(100L)), 1000);
        MockInboundStream mockInboundStream2 = new MockInboundStream(0, Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE), 1000);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 128);
        this.instreams.add(mockInboundStream);
        this.instreams.add(mockInboundStream2);
        this.outstreams.add(mockOutboundStream);
        callUntil(createTasklet());
        Assert.assertEquals(Arrays.asList("wm(100)-0", JetTestSupport.wm(100L)), mockOutboundStream.getBuffer());
    }

    @Test
    public void when_oneEdgeIdleAndThenRecovers_then_usedInCoalescing() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.singletonList(JetTestSupport.wm(100L)), 1000);
        MockInboundStream mockInboundStream2 = new MockInboundStream(0, Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE), 1000);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 128);
        this.instreams.add(mockInboundStream);
        this.instreams.add(mockInboundStream2);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet();
        callUntil(createTasklet);
        Assert.assertEquals(Arrays.asList("wm(100)-0", JetTestSupport.wm(100L)), mockOutboundStream.getBuffer());
        mockOutboundStream.getBuffer().clear();
        mockInboundStream2.push(JetTestSupport.wm(101L));
        callUntil(createTasklet);
        mockInboundStream.push(JetTestSupport.wm(102L));
        callUntil(createTasklet);
        Assert.assertEquals(Arrays.asList("wm(101)-0", JetTestSupport.wm(101L)), mockOutboundStream.getBuffer());
    }

    @Test
    public void when_oneEdgeWaitsForWmAndThenDone_then_wmForwarded() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.singletonList(JetTestSupport.wm(100L)), 1000);
        MockInboundStream mockInboundStream2 = new MockInboundStream(0, Collections.singletonList(DoneItem.DONE_ITEM), 1000);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 128);
        this.instreams.add(mockInboundStream);
        this.instreams.add(mockInboundStream2);
        this.outstreams.add(mockOutboundStream);
        callUntil(createTasklet());
        Assert.assertEquals(Arrays.asList("wm(100)-0", JetTestSupport.wm(100L)), mockOutboundStream.getBuffer());
    }

    private ProcessorTasklet createTasklet() {
        for (int i = 0; i < this.instreams.size(); i++) {
            this.instreams.get(i).setOrdinal(i);
        }
        SnapshotContext snapshotContext = new SnapshotContext((ILogger) Mockito.mock(ILogger.class), "test job", -1L, ProcessingGuarantee.EXACTLY_ONCE);
        snapshotContext.initTaskletCount(1, 1, 0);
        ProcessorTasklet processorTasklet = new ProcessorTasklet(this.context, TestUtil.DIRECT_EXECUTOR, new DefaultSerializationServiceBuilder().build(), this.processor, this.instreams, this.outstreams, snapshotContext, this.snapshotCollector, false);
        processorTasklet.init();
        return processorTasklet;
    }

    private static void callUntil(ProcessorTasklet processorTasklet) {
        int i = 0;
        while (true) {
            ProgressState call = processorTasklet.call();
            if (call == ProgressState.NO_PROGRESS) {
                return;
            }
            Assert.assertEquals("Failed to make progress", ProgressState.MADE_PROGRESS, call);
            i++;
            Assert.assertTrue(String.format("tasklet.call() invoked %d times without reaching %s. Last state was %s", 10, ProgressState.NO_PROGRESS, call), i < 10);
        }
    }
}
