package com.hazelcast.jet.impl.execution;

import com.hazelcast.function.ComparatorEx;
import com.hazelcast.internal.util.concurrent.ConcurrentConveyor;
import com.hazelcast.internal.util.concurrent.OneToOneConcurrentArrayQueue;
import com.hazelcast.internal.util.concurrent.QueuedPipe;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import java.util.ArrayList;
import java.util.Arrays;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/execution/ConcurrentInboundEdgeStreamTest.class */
public class ConcurrentInboundEdgeStreamTest {
    private static final Object senderGone = new Object();

    @Rule
    public ExpectedException exception = ExpectedException.none();
    private OneToOneConcurrentArrayQueue<Object> q1;
    private OneToOneConcurrentArrayQueue<Object> q2;
    private InboundEdgeStream stream;
    private ConcurrentConveyor<Object> conveyor;

    @Before
    public void setUp() {
        this.q1 = new OneToOneConcurrentArrayQueue<>(128);
        this.q2 = new OneToOneConcurrentArrayQueue<>(128);
        this.conveyor = ConcurrentConveyor.concurrentConveyor(senderGone, new QueuedPipe[]{this.q1, this.q2});
        this.stream = ConcurrentInboundEdgeStream.create(this.conveyor, 0, 0, false, "cies", (ComparatorEx) null);
    }

    @Test
    public void when_twoEmittersOneDoneFirst_then_madeProgress() {
        add(this.q1, 1, 2, DoneItem.DONE_ITEM);
        add(this.q2, 6);
        drainAndAssert(ProgressState.MADE_PROGRESS, 1, 2, 6);
        add(this.q2, 7, DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.DONE, 7);
        drainAndAssert(ProgressState.WAS_ALREADY_DONE, new Object[0]);
    }

    @Test
    public void when_twoEmittersDrainedAtOnce_then_firstCallDone() {
        add(this.q1, 1, 2, DoneItem.DONE_ITEM);
        add(this.q2, 6, DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.DONE, 1, 2, 6);
    }

    @Test
    public void when_allEmittersInitiallyDone_then_firstCallDone() {
        this.q1.add(DoneItem.DONE_ITEM);
        this.q2.add(DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.DONE, new Object[0]);
        drainAndAssert(ProgressState.WAS_ALREADY_DONE, new Object[0]);
    }

    @Test
    public void when_oneQueueIsIdle_then_otherHasProgress() {
        add(this.q2, WatermarkCoalescer.IDLE_MESSAGE);
        drainAndAssert(ProgressState.MADE_PROGRESS, new Object[0]);
        add(this.q1, 1);
        drainAndAssert(ProgressState.MADE_PROGRESS, 1);
        add(this.q1, JetTestSupport.wm(2L));
        drainAndAssert(ProgressState.MADE_PROGRESS, JetTestSupport.wm(2L));
    }

    @Test
    public void when_oneEmitterWithNoProgress_then_noProgress() {
        add(this.q2, 1, DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.MADE_PROGRESS, 1);
        drainAndAssert(ProgressState.NO_PROGRESS, new Object[0]);
        this.q1.add(DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.DONE, new Object[0]);
        drainAndAssert(ProgressState.WAS_ALREADY_DONE, new Object[0]);
    }

    @Test
    public void when_receivingWatermarks_then_coalesce() {
        add(this.q1, JetTestSupport.wm(1L));
        add(this.q2, JetTestSupport.wm(2L));
        drainAndAssert(ProgressState.MADE_PROGRESS, JetTestSupport.wm(1L));
        add(this.q1, JetTestSupport.wm(3L));
        add(this.q2, JetTestSupport.wm(3L));
        drainAndAssert(ProgressState.MADE_PROGRESS, JetTestSupport.wm(2L), JetTestSupport.wm(3L));
    }

    @Test
    public void when_receivingBarriers_then_coalesce() {
        add(this.q1, barrier(0L));
        add(this.q2, 1);
        drainAndAssert(ProgressState.MADE_PROGRESS, 1);
        add(this.q1, 2);
        add(this.q2, barrier(0L));
        drainAndAssert(ProgressState.MADE_PROGRESS, 2);
        drainAndAssert(ProgressState.MADE_PROGRESS, barrier(0L));
    }

    @Test
    public void when_receivingBarriers_then_waitForBarrier() {
        this.stream = ConcurrentInboundEdgeStream.create(this.conveyor, 0, 0, true, "cies", (ComparatorEx) null);
        add(this.q1, barrier(0L));
        add(this.q2, 1);
        drainAndAssert(ProgressState.MADE_PROGRESS, 1);
        add(this.q1, 2);
        drainAndAssert(ProgressState.NO_PROGRESS, new Object[0]);
        add(this.q2, barrier(0L));
        drainAndAssert(ProgressState.MADE_PROGRESS, barrier(0L));
        drainAndAssert(ProgressState.MADE_PROGRESS, 2);
    }

    @Test
    public void when_receivingBarriersWhileDone_then_coalesce() {
        this.stream = ConcurrentInboundEdgeStream.create(this.conveyor, 0, 0, true, "cies", (ComparatorEx) null);
        add(this.q1, 1, barrier(0L));
        add(this.q2, DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.MADE_PROGRESS, 1);
        drainAndAssert(ProgressState.MADE_PROGRESS, barrier(0L));
        add(this.q1, DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.DONE, new Object[0]);
    }

    @Test
    public void when_receiveOnlyBarrierAndDoneItemFromSameQueue_then_coalesce() {
        add(this.q1, 1, barrier(0L), DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.MADE_PROGRESS, 1);
        drainAndAssert(ProgressState.MADE_PROGRESS, new Object[0]);
        add(this.q2, barrier(0L));
        drainAndAssert(ProgressState.MADE_PROGRESS, barrier(0L));
    }

    @Test
    public void when_barrierAndWmInQueues_then_notReordered() {
        add(this.q1, JetTestSupport.wm(1L));
        add(this.q2, barrier(0L));
        drainAndAssert(ProgressState.MADE_PROGRESS, new Object[0]);
        Assert.assertEquals(0L, this.q1.size());
        Assert.assertEquals(0L, this.q2.size());
        add(this.q1, barrier(0L));
        add(this.q2, JetTestSupport.wm(1L));
        drainAndAssert(ProgressState.MADE_PROGRESS, barrier(0L));
        drainAndAssert(ProgressState.MADE_PROGRESS, JetTestSupport.wm(1L));
    }

    @Test
    public void when_barrierAndDone_then_barrierEmitted() {
        add(this.q1, barrier(0L), DoneItem.DONE_ITEM);
        add(this.q2, barrier(0L), DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.MADE_PROGRESS, barrier(0L));
        drainAndAssert(ProgressState.DONE, new Object[0]);
    }

    @Test
    public void when_oneQueueDone_then_theOtherWorks() {
        add(this.q1, DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.MADE_PROGRESS, new Object[0]);
        add(this.q2, barrier(0L));
        drainAndAssert(ProgressState.MADE_PROGRESS, barrier(0L));
        add(this.q2, JetTestSupport.wm(0L));
        drainAndAssert(ProgressState.MADE_PROGRESS, JetTestSupport.wm(0L));
    }

    @Test
    public void when_nonSpecificBroadcastItems_then_drainedInOneBatch() {
        BroadcastEntry broadcastEntry = new BroadcastEntry("k", "v");
        add(this.q1, broadcastEntry);
        add(this.q1, broadcastEntry);
        drainAndAssert(ProgressState.MADE_PROGRESS, broadcastEntry, broadcastEntry);
    }

    @Test
    public void when_wmInOneQueueAndTheOtherDoneLater_then_wmEmitted_v1() {
        add(this.q1, JetTestSupport.wm(1L));
        add(this.q2, DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.MADE_PROGRESS, JetTestSupport.wm(1L));
    }

    @Test
    public void when_wmInOneQueueAndTheOtherDoneLater_then_wmEmitted_v2() {
        add(this.q1, JetTestSupport.wm(1L));
        drainAndAssert(ProgressState.MADE_PROGRESS, new Object[0]);
        add(this.q2, DoneItem.DONE_ITEM);
        drainAndAssert(ProgressState.MADE_PROGRESS, JetTestSupport.wm(1L));
    }

    @Test
    public void test_wmInQ1AndItemInQ2InSingleDrain() {
        add(this.q2, JetTestSupport.wm(1L));
        drainAndAssert(ProgressState.MADE_PROGRESS, new Object[0]);
        add(this.q1, JetTestSupport.wm(1L));
        add(this.q2, 1);
        drainAndAssert(ProgressState.MADE_PROGRESS, 1);
        drainAndAssert(ProgressState.MADE_PROGRESS, JetTestSupport.wm(1L));
    }

    @Test
    public void test_barrierInQ1AndItemInQ2InSingleDrain() {
        add(this.q2, barrier(1L));
        drainAndAssert(ProgressState.MADE_PROGRESS, new Object[0]);
        add(this.q1, barrier(1L));
        add(this.q2, 1);
        drainAndAssert(ProgressState.MADE_PROGRESS, barrier(1L));
        drainAndAssert(ProgressState.MADE_PROGRESS, 1);
    }

    private void drainAndAssert(ProgressState progressState, Object... objArr) {
        ArrayList arrayList = new ArrayList();
        InboundEdgeStream inboundEdgeStream = this.stream;
        arrayList.getClass();
        Assert.assertEquals("progressState", progressState, inboundEdgeStream.drainTo(arrayList::add));
        Assert.assertEquals(Arrays.asList(objArr), arrayList);
    }

    private void add(OneToOneConcurrentArrayQueue<Object> oneToOneConcurrentArrayQueue, Object... objArr) {
        oneToOneConcurrentArrayQueue.addAll(Arrays.asList(objArr));
    }

    private SnapshotBarrier barrier(long j) {
        return new SnapshotBarrier(j, false);
    }
}
