package com.hazelcast.jet.impl.execution;

import com.hazelcast.collection.IList;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.JetTestSupport;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.TestProcessors;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.Watermark;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.test.HazelcastParallelParametersRunnerFactory;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Parameterized.UseParametersRunnerFactory(HazelcastParallelParametersRunnerFactory.class)
@RunWith(HazelcastParametrizedRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/execution/WatermarkCoalescer_IntegrationTest.class */
public class WatermarkCoalescer_IntegrationTest extends JetTestSupport {
    private static final String DONE_ITEM_STR = "DONE_ITEM";

    @Parameterized.Parameter
    public Mode mode;
    private DAG dag = new DAG();
    private HazelcastInstance instance;
    private IList<Object> sinkList;

    /* loaded from: input_file:com/hazelcast/jet/impl/execution/WatermarkCoalescer_IntegrationTest$ListSource.class */
    public static class ListSource extends AbstractProcessor {
        private final List<?> list;
        private int pos;
        private long nextItemAt = Long.MIN_VALUE;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/hazelcast/jet/impl/execution/WatermarkCoalescer_IntegrationTest$ListSource$Delay.class */
        public static final class Delay implements Serializable {
            final long millis;

            private Delay(long j) {
                this.millis = j;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/hazelcast/jet/impl/execution/WatermarkCoalescer_IntegrationTest$ListSource$SerializableWm.class */
        public static final class SerializableWm implements Serializable {
            final long timestamp;

            private SerializableWm(long j) {
                this.timestamp = j;
            }
        }

        public ListSource(List<?> list) {
            this.list = list;
        }

        public boolean complete() {
            if (this.nextItemAt != Long.MIN_VALUE && System.nanoTime() < this.nextItemAt) {
                return false;
            }
            this.nextItemAt = Long.MIN_VALUE;
            while (this.pos < this.list.size()) {
                Object obj = this.list.get(this.pos);
                if (obj instanceof SerializableWm) {
                    obj = new Watermark(((SerializableWm) obj).timestamp);
                } else {
                    if (obj instanceof Delay) {
                        getLogger().info("will wait " + ((Delay) obj).millis + " ms");
                        this.nextItemAt = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(((Delay) obj).millis);
                        this.pos++;
                        return false;
                    }
                    if (obj.equals(WatermarkCoalescer_IntegrationTest.DONE_ITEM_STR)) {
                        getLogger().info("returning true");
                        return true;
                    }
                }
                if (!tryEmit(obj)) {
                    return false;
                }
                getLogger().info("emitted " + obj);
                this.pos++;
            }
            return false;
        }

        public static ProcessorMetaSupplier supplier(List<Object> list) {
            List<Object> replaceWatermarks = replaceWatermarks(list);
            return ProcessorMetaSupplier.preferLocalParallelismOne(() -> {
                return new ListSource(replaceWatermarks);
            });
        }

        public static ProcessorMetaSupplier supplier(List<Object>... listArr) {
            for (int i = 0; i < listArr.length; i++) {
                listArr[i] = replaceWatermarks(listArr[i]);
            }
            return ProcessorMetaSupplier.preferLocalParallelismOne(i2 -> {
                return (List) Arrays.stream(listArr).map(ListSource::new).collect(Collectors.toList());
            });
        }

        private static List<Object> replaceWatermarks(List<Object> list) {
            ArrayList arrayList = new ArrayList();
            for (Object obj : list) {
                arrayList.add(obj instanceof Watermark ? new SerializableWm(((Watermark) obj).timestamp()) : obj);
            }
            return arrayList;
        }

        private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
            String implMethodName = serializedLambda.getImplMethodName();
            boolean z = -1;
            switch (implMethodName.hashCode()) {
                case 1263334861:
                    if (implMethodName.equals("lambda$supplier$c5e9e24f$1")) {
                        z = false;
                        break;
                    }
                    break;
                case 1449489363:
                    if (implMethodName.equals("lambda$supplier$f65ec1da$1")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/SupplierEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("getEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/execution/WatermarkCoalescer_IntegrationTest$ListSource") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/List;)Lcom/hazelcast/jet/core/Processor;")) {
                        List list = (List) serializedLambda.getCapturedArg(0);
                        return () -> {
                            return new ListSource(list);
                        };
                    }
                    break;
                case true:
                    if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/core/ProcessorSupplier") && serializedLambda.getFunctionalInterfaceMethodName().equals("get") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(I)Ljava/util/Collection;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/execution/WatermarkCoalescer_IntegrationTest$ListSource") && serializedLambda.getImplMethodSignature().equals("([Ljava/util/List;I)Ljava/util/Collection;")) {
                        List[] listArr = (List[]) serializedLambda.getCapturedArg(0);
                        return i2 -> {
                            return (List) Arrays.stream(listArr).map(ListSource::new).collect(Collectors.toList());
                        };
                    }
                    break;
            }
            throw new IllegalArgumentException("Invalid lambda deserialization");
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/impl/execution/WatermarkCoalescer_IntegrationTest$Mode.class */
    private enum Mode {
        TWO_EDGES,
        TWO_QUEUES
    }

    @Parameterized.Parameters(name = "{0}")
    public static Object[] parameters() {
        return Mode.values();
    }

    @Before
    public void before() {
        this.instance = super.createHazelcastInstance();
        this.sinkList = this.instance.getList("sinkList");
    }

    private static DAG createDag(Mode mode, List<Object> list, List<Object> list2) {
        DAG dag = new DAG();
        Vertex localParallelism = dag.newVertex("mapWmToString", TestProcessors.MapWatermarksToString.mapWatermarksToString(false)).localParallelism(1);
        dag.edge(Edge.between(localParallelism, dag.newVertex("sink", SinkProcessors.writeListP("sinkList")).localParallelism(1)));
        switch (mode) {
            case TWO_EDGES:
                Vertex localParallelism2 = dag.newVertex("edge1", ListSource.supplier(list)).localParallelism(1);
                Vertex localParallelism3 = dag.newVertex("edge2", ListSource.supplier(list2)).localParallelism(1);
                dag.edge(Edge.from(localParallelism2).to(localParallelism, 0));
                dag.edge(Edge.from(localParallelism3).to(localParallelism, 1));
                break;
            case TWO_QUEUES:
                dag.edge(Edge.between(dag.newVertex("edge", ListSource.supplier((List<Object>[]) new List[]{list, list2})).localParallelism(2), localParallelism));
                break;
            default:
                throw new IllegalArgumentException(mode.toString());
        }
        return dag;
    }

    @Test
    public void when_i1_active_i2_active_then_wmForwardedImmediately() {
        this.dag = createDag(this.mode, Collections.singletonList(wm(100L)), Collections.singletonList(wm(100L)));
        this.instance.getJet().newJob(this.dag);
        assertTrueEventually(() -> {
            Assert.assertEquals(1L, this.sinkList.size());
        });
        Assert.assertEquals("wm(100)", this.sinkList.get(0));
    }

    @Test
    public void when_i1_active_i2_idle_then_wmForwardedImmediately() {
        this.dag = createDag(this.mode, Collections.singletonList(wm(100L)), Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE));
        this.instance.getJet().newJob(this.dag);
        assertTrueEventually(() -> {
            Assert.assertEquals(1L, this.sinkList.size());
        });
        Assert.assertEquals("wm(100)", this.sinkList.get(0));
    }

    @Test
    public void when_i1_idle_i2_active_then_wmForwardedImmediately() {
        this.dag = createDag(this.mode, Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE), Collections.singletonList(wm(100L)));
        this.instance.getJet().newJob(this.dag);
        assertTrueEventually(() -> {
            Assert.assertEquals(1L, this.sinkList.size());
        });
        Assert.assertEquals("wm(100)", this.sinkList.get(0));
    }

    @Test
    public void when_i1_idle_i2_idle_then_idleMessageForwardedImmediately() {
        this.dag = createDag(this.mode, Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE), Collections.singletonList(WatermarkCoalescer.IDLE_MESSAGE));
        this.instance.getJet().newJob(this.dag);
        assertTrueAllTheTime(() -> {
            Assert.assertEquals(0L, this.sinkList.size());
        }, 3L);
    }

    @Test
    public void when_waitingForWmOnI2ButI2BecomesDone_then_wmFromI1Forwarded() {
        this.dag = createDag(this.mode, Collections.singletonList(wm(100L)), Arrays.asList(delay(500L), DONE_ITEM_STR));
        this.instance.getJet().newJob(this.dag);
        assertTrueEventually(() -> {
            Assert.assertEquals(1L, this.sinkList.size());
        });
        Assert.assertEquals("wm(100)", this.sinkList.get(0));
    }

    @Test
    public void when_multipleWm_then_allForwarded() {
        this.dag = createDag(this.mode, Arrays.asList(wm(100L), delay(500L), wm(101L)), Arrays.asList(wm(100L), delay(500L), wm(101L)));
        this.instance.getJet().newJob(this.dag);
        assertTrueEventually(() -> {
            Assert.assertEquals(2L, this.sinkList.size());
        });
        Assert.assertEquals("wm(100)", this.sinkList.get(0));
        Assert.assertEquals("wm(101)", this.sinkList.get(1));
    }

    private ListSource.Delay delay(long j) {
        return new ListSource.Delay(j);
    }
}
