package com.hazelcast.jet.impl.execution;

import com.hazelcast.internal.serialization.impl.DefaultSerializationServiceBuilder;
import com.hazelcast.jet.core.Inbox;
import com.hazelcast.jet.core.Outbox;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.TestUtil;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.test.TestProcessorContext;
import com.hazelcast.jet.impl.util.ProgressState;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nonnull;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.mockito.Mockito;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/execution/ProcessorTaskletTest.class */
public class ProcessorTaskletTest {
    private static final int MOCK_INPUT_SIZE = 10;
    private static final int CALL_COUNT_LIMIT = 10;
    private List<Object> mockInput;
    private List<MockInboundStream> instreams;
    private List<OutboundEdgeStream> outstreams;
    private PassThroughProcessor processor;
    private Processor.Context context;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/hazelcast/jet/impl/execution/ProcessorTaskletTest$PassThroughProcessor.class */
    public static class PassThroughProcessor implements Processor {
        int nullaryProcessCallCountdown;
        int itemsToEmitInComplete;
        int itemsToEmitInEachCompleteEdge;
        boolean completeReturnedTrue;
        Set<Integer> completeEdgeReturnedTrue;
        private int itemsToEmitInThisCompleteEdge;
        private Outbox outbox;
        private CountDownLatch doneLatch;

        private PassThroughProcessor() {
            this.completeEdgeReturnedTrue = new HashSet();
            this.doneLatch = new CountDownLatch(0);
        }

        public void init(@Nonnull Outbox outbox, @Nonnull Processor.Context context) {
            this.outbox = outbox;
        }

        public void process(int i, @Nonnull Inbox inbox) {
            while (true) {
                Object peek = inbox.peek();
                if (peek == null || !this.outbox.offer(peek)) {
                    return;
                } else {
                    inbox.remove();
                }
            }
        }

        public boolean tryProcessWatermark(@Nonnull Watermark watermark) {
            return this.outbox.offer(watermark);
        }

        public boolean complete() {
            Assert.assertFalse(this.completeReturnedTrue);
            if (this.itemsToEmitInComplete > 0 && this.outbox.offer("completing")) {
                this.itemsToEmitInComplete--;
            }
            boolean z = this.itemsToEmitInComplete == 0;
            this.completeReturnedTrue = z;
            return z;
        }

        public boolean completeEdge(int i) {
            Assert.assertFalse(this.completeEdgeReturnedTrue.contains(Integer.valueOf(i)));
            if (this.itemsToEmitInThisCompleteEdge == 0) {
                this.itemsToEmitInThisCompleteEdge = this.itemsToEmitInEachCompleteEdge;
            }
            if (this.itemsToEmitInThisCompleteEdge > 0 && this.outbox.offer("completedEdge=" + i)) {
                this.itemsToEmitInThisCompleteEdge--;
            }
            if (this.itemsToEmitInThisCompleteEdge != 0) {
                return false;
            }
            this.completeEdgeReturnedTrue.add(Integer.valueOf(i));
            return true;
        }

        public boolean tryProcess() {
            int i = this.nullaryProcessCallCountdown;
            this.nullaryProcessCallCountdown = i - 1;
            return i <= 0;
        }

        public void close() throws InterruptedException {
            this.doneLatch.await();
        }
    }

    @Before
    public void setUp() {
        this.mockInput = (List) IntStream.range(0, 10).boxed().collect(Collectors.toList());
        this.processor = new PassThroughProcessor();
        this.context = new TestProcessorContext();
        this.instreams = new ArrayList();
        this.outstreams = new ArrayList();
    }

    @Test
    public void when_isCooperative_then_true() {
        Assert.assertTrue(createTasklet().isCooperative());
    }

    @Test
    public void when_singleInstreamAndOutstream_then_outstreamGetsAll() {
        this.mockInput.add(DoneItem.DONE_ITEM);
        MockInboundStream mockInboundStream = new MockInboundStream(0, this.mockInput, this.mockInput.size());
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        callUntil(createTasklet(), ProgressState.DONE);
        Assert.assertEquals(this.mockInput, mockOutboundStream.getBuffer());
    }

    @Test
    public void when_oneInstreamAndTwoOutstreams_then_allOutstreamsGetAllItems() {
        this.mockInput.add(DoneItem.DONE_ITEM);
        MockInboundStream mockInboundStream = new MockInboundStream(0, this.mockInput, this.mockInput.size());
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0);
        MockOutboundStream mockOutboundStream2 = new MockOutboundStream(1);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        this.outstreams.add(mockOutboundStream2);
        callUntil(createTasklet(), ProgressState.DONE);
        Assert.assertEquals(this.mockInput, mockOutboundStream.getBuffer());
        Assert.assertEquals(this.mockInput, mockOutboundStream2.getBuffer());
    }

    @Test
    public void when_instreamChunked_then_processAllEventually() {
        this.mockInput.add(DoneItem.DONE_ITEM);
        MockInboundStream mockInboundStream = new MockInboundStream(0, this.mockInput, 4);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        callUntil(createTasklet(), ProgressState.DONE);
        Assert.assertEquals(this.mockInput, mockOutboundStream.getBuffer());
    }

    @Test
    public void when_3instreams_then_pushAllIntoOutstream() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, this.mockInput.subList(0, 4), 4);
        MockInboundStream mockInboundStream2 = new MockInboundStream(0, this.mockInput.subList(4, 8), 4);
        MockInboundStream mockInboundStream3 = new MockInboundStream(0, this.mockInput.subList(8, 10), 4);
        mockInboundStream.push(DoneItem.DONE_ITEM);
        mockInboundStream2.push(DoneItem.DONE_ITEM);
        mockInboundStream3.push(DoneItem.DONE_ITEM);
        this.instreams.addAll(Arrays.asList(mockInboundStream, mockInboundStream2, mockInboundStream3));
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0);
        this.outstreams.add(mockOutboundStream);
        callUntil(createTasklet(), ProgressState.DONE);
        this.mockInput.add(DoneItem.DONE_ITEM);
        Assert.assertEquals(new HashSet(this.mockInput), new HashSet(mockOutboundStream.getBuffer()));
    }

    @Test
    public void when_outstreamRefusesItem_then_noProgress() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, this.mockInput, this.mockInput.size());
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 1);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        callUntil(createTasklet(), ProgressState.NO_PROGRESS);
        Assert.assertTrue(mockOutboundStream.getBuffer().equals(this.mockInput.subList(0, 1)));
    }

    @Test
    public void when_inboxEmpty_then_nullaryProcessCalled() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.emptyList(), 1);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet();
        this.processor.nullaryProcessCallCountdown = 1;
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        Assert.assertTrue("Expected: nullaryProcessCallCountdown<=0, was " + this.processor.nullaryProcessCallCountdown, this.processor.nullaryProcessCallCountdown <= 0);
    }

    @Test
    public void when_completeReturnsFalse_then_retried() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Collections.singletonList(DoneItem.DONE_ITEM), 1);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 1);
        this.instreams.add(mockInboundStream);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet();
        this.processor.itemsToEmitInComplete = 2;
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        mockOutboundStream.flush();
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        mockOutboundStream.flush();
        callUntil(createTasklet, ProgressState.DONE);
        Assert.assertTrue(this.processor.itemsToEmitInComplete <= 0);
    }

    @Test
    public void when_differentPriorities_then_respected() {
        MockInboundStream mockInboundStream = new MockInboundStream(0, Arrays.asList(1, 2, DoneItem.DONE_ITEM), 1);
        MockInboundStream mockInboundStream2 = new MockInboundStream(0, Arrays.asList(3, 4, DoneItem.DONE_ITEM), 1);
        MockInboundStream mockInboundStream3 = new MockInboundStream(1, Arrays.asList(5, 6, DoneItem.DONE_ITEM), 1);
        MockInboundStream mockInboundStream4 = new MockInboundStream(1, Arrays.asList(7, 8, DoneItem.DONE_ITEM), 1);
        MockOutboundStream mockOutboundStream = new MockOutboundStream(0, 1);
        this.instreams.add(mockInboundStream);
        this.instreams.add(mockInboundStream2);
        this.instreams.add(mockInboundStream3);
        this.instreams.add(mockInboundStream4);
        this.outstreams.add(mockOutboundStream);
        ProcessorTasklet createTasklet = createTasklet();
        this.processor.itemsToEmitInComplete = 1;
        this.processor.itemsToEmitInEachCompleteEdge = 1;
        List asList = Arrays.asList(1, 3, 2, 4, "completedEdge=0", "completedEdge=1", 5, 7, 6, 8, "completedEdge=2", "completedEdge=3");
        ArrayList arrayList = new ArrayList();
        boolean z = true;
        while (arrayList.size() < asList.size()) {
            callUntil(createTasklet, ProgressState.MADE_PROGRESS);
            if (mockOutboundStream.getBuffer().isEmpty() && z) {
                z = false;
            } else {
                z = true;
                Assert.assertEquals("Expected 1 item after " + arrayList, 1L, mockOutboundStream.getBuffer().size());
                arrayList.add(mockOutboundStream.getBuffer().remove(0));
            }
        }
        Assert.assertEquals(asList, arrayList);
    }

    @Test
    public void when_closeBlocked_then_waitUntilDone() {
        this.processor.doneLatch = new CountDownLatch(1);
        ProcessorTasklet createTasklet = createTasklet(ForkJoinPool.commonPool());
        callUntil(createTasklet, ProgressState.NO_PROGRESS);
        this.processor.doneLatch.countDown();
        HazelcastTestSupport.assertTrueEventually(() -> {
            Assert.assertEquals(ProgressState.DONE, createTasklet.call());
        }, 2L);
    }

    private ProcessorTasklet createTasklet() {
        return createTasklet(TestUtil.DIRECT_EXECUTOR);
    }

    private ProcessorTasklet createTasklet(ExecutorService executorService) {
        for (int i = 0; i < this.instreams.size(); i++) {
            this.instreams.get(i).setOrdinal(i);
        }
        ProcessorTasklet processorTasklet = new ProcessorTasklet(this.context, executorService, new DefaultSerializationServiceBuilder().build(), this.processor, this.instreams, this.outstreams, (SnapshotContext) Mockito.mock(SnapshotContext.class), new MockOutboundCollector(10), false);
        processorTasklet.init();
        return processorTasklet;
    }

    private static void callUntil(Tasklet tasklet, ProgressState progressState) {
        int i = 0;
        while (true) {
            ProgressState call = tasklet.call();
            if (call == progressState) {
                return;
            }
            Assert.assertEquals("Failed to make progress", ProgressState.MADE_PROGRESS, call);
            i++;
            Assert.assertTrue(String.format("tasklet.call() invoked %d times without reaching %s. Last state was %s", 10, progressState, call), i < 10);
        }
    }
}
