package com.hazelcast.jet.pipeline;

import com.hazelcast.collection.IList;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.processor.Processors;
import com.hazelcast.map.IMap;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.topic.impl.reliable.ReliableTopicDestroyTest;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/pipeline/StreamSourceStageTest.class */
public class StreamSourceStageTest extends StreamSourceStageTestBase {
    @BeforeClass
    public static void beforeClass1() {
        IMap map = instance.getMap("journaledMap");
        map.put(1, 1);
        map.put(2, 2);
    }

    private static StreamSource<Integer> createSourceJournal() {
        return Sources.mapJournal("journaledMap", JournalInitialPosition.START_FROM_OLDEST, (v0) -> {
            return v0.getKey();
        }, PredicateEx.alwaysTrue());
    }

    private static StreamSource<Integer> createTimestampedSourceBuilder() {
        return SourceBuilder.timestampedStream("s", context -> {
            return new boolean[1];
        }).fillBufferFn((zArr, timestampedSourceBuffer) -> {
            if (zArr[0]) {
                return;
            }
            timestampedSourceBuffer.add(1, 1L);
            timestampedSourceBuffer.add(2, 2L);
            zArr[0] = true;
        }).build();
    }

    private static StreamSource<Integer> createPlainSourceBuilder() {
        return SourceBuilder.stream("s", context -> {
            return new boolean[1];
        }).fillBufferFn((zArr, sourceBuffer) -> {
            if (zArr[0]) {
                return;
            }
            sourceBuffer.add(1);
            sourceBuffer.add(2);
            zArr[0] = true;
        }).build();
    }

    @Test
    public void test_timestampedSourceBuilder_withoutTimestamps() {
        test(createTimestampedSourceBuilder(), this.withoutTimestampsFn, Arrays.asList(2L, 3L), null);
    }

    @Test
    public void test_timestampedSourceBuilder_withNativeTimestamps() {
        test(createTimestampedSourceBuilder(), this.withNativeTimestampsFn, Arrays.asList(1L, 2L), null);
    }

    @Test
    public void test_timestampedSourceBuilder_withTimestamps() {
        test(createTimestampedSourceBuilder(), this.withTimestampsFn, Arrays.asList(2L, 3L), null);
    }

    @Test
    public void test_plainSourceBuilder_withoutTimestamps() {
        test(createPlainSourceBuilder(), this.withoutTimestampsFn, Arrays.asList(2L, 3L), null);
    }

    @Test
    public void test_plainSourceBuilder_withNativeTimestamps() {
        test(createPlainSourceBuilder(), this.withNativeTimestampsFn, Collections.emptyList(), "The source doesn't support native timestamps");
    }

    @Test
    public void test_plainSourceBuilder_withTimestamps() {
        test(createPlainSourceBuilder(), this.withTimestampsFn, Arrays.asList(2L, 3L), null);
    }

    @Test
    public void test_sourceJournal_withoutTimestamps() {
        test(createSourceJournal(), this.withoutTimestampsFn, Arrays.asList(2L, 3L), null);
    }

    @Test
    public void test_sourceJournal_withNativeTimestamps() {
        test(createSourceJournal(), this.withNativeTimestampsFn, Collections.emptyList(), "The source doesn't support native timestamps");
    }

    @Test
    public void test_sourceJournal_withTimestamps() {
        test(createSourceJournal(), this.withTimestampsFn, Arrays.asList(2L, 3L), null);
    }

    @Test
    public void test_withTimestampsButTimestampsNotUsed() {
        IList list = instance.getList("sinkList");
        Pipeline create = Pipeline.create();
        create.readFrom(createSourceJournal()).withIngestionTimestamps().writeTo(Sinks.list(list));
        instance.getJet().newJob(create);
        assertTrueEventually(() -> {
            Assert.assertEquals(Arrays.asList(1, 2), new ArrayList((Collection) list));
        }, 5L);
    }

    @Test
    public void when_sparseItemsWithIngestionTimestamps_then_noExtraLatency() {
        IList list = instance.getList(randomMapName());
        IMap map = instance.getMap(randomMapName());
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.mapJournal(map, JournalInitialPosition.START_FROM_OLDEST)).withIngestionTimestamps().window(WindowDefinition.tumbling(1L)).aggregate(AggregateOperations.counting()).writeTo(Sinks.list(list));
        Job newJob = instance.getJet().newJob(create);
        Assert.assertEquals(0L, list.size());
        map.put(3, 3);
        assertTrueEventually(() -> {
            Assert.assertEquals(1L, list.size());
        }, 10L);
        map.put(4, 4);
        assertTrueEventually(() -> {
            Assert.assertEquals(2L, list.size());
        }, 10L);
        newJob.cancel();
    }

    @Test
    @Category({NightlyTest.class})
    public void when_sparseItemsWithIngestionTimestamps_then_windowIsNotEmittedTooEarly() {
        IList list = instance.getList(randomMapName());
        IMap map = instance.getMap(randomMapName());
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.mapJournal(map, JournalInitialPosition.START_FROM_OLDEST)).withIngestionTimestamps().window(WindowDefinition.session(15000L)).aggregate(AggregateOperations.counting()).writeTo(Sinks.list(list));
        long nanoTime = System.nanoTime();
        Job newJob = instance.getJet().newJob(create);
        Assert.assertEquals(0L, list.size());
        map.put(5, 5);
        assertTrueEventually(() -> {
            Assert.assertEquals(1L, list.size());
        }, 30L);
        Assert.assertTrue(System.nanoTime() - nanoTime > TimeUnit.SECONDS.toNanos(15L));
        newJob.cancel();
    }

    @Test
    public void when_withTimestampsAndAddTimestamps_then_fail() {
        StreamStage withIngestionTimestamps = Pipeline.create().readFrom(Sources.mapJournal(ReliableTopicDestroyTest.RELIABLE_TOPIC_NAME, JournalInitialPosition.START_FROM_OLDEST)).withIngestionTimestamps();
        this.expectedException.expectMessage("This stage already has timestamps assigned to it");
        withIngestionTimestamps.addTimestamps(entry -> {
            return 0L;
        }, 0L);
    }

    @Test
    public void when_sourceHasPreferredLocalParallelism_then_lpMatchSource() {
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.streamFromProcessor("src", ProcessorMetaSupplier.of(11, ProcessorSupplier.of(Processors.noopP())))).withTimestamps(obj -> {
            return 0L;
        }, 0L).writeTo(Sinks.noop());
        DAG dag = create.toDag();
        Vertex vertex = (Vertex) Objects.requireNonNull(dag.getVertex("src"));
        Vertex vertex2 = (Vertex) Objects.requireNonNull(dag.getVertex("src-add-timestamps"));
        Assert.assertEquals(11, vertex.determineLocalParallelism(-1));
        Assert.assertEquals(11, vertex2.determineLocalParallelism(-1));
    }

    @Test
    public void when_sourceHasExplicitLocalParallelism_then_lpMatchSource() {
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.streamFromProcessor("src", ProcessorMetaSupplier.of(ProcessorSupplier.of(Processors.noopP())))).withTimestamps(obj -> {
            return 0L;
        }, 0L).setLocalParallelism(11).writeTo(Sinks.noop());
        DAG dag = create.toDag();
        Vertex vertex = (Vertex) Objects.requireNonNull(dag.getVertex("src"));
        Vertex vertex2 = (Vertex) Objects.requireNonNull(dag.getVertex("src-add-timestamps"));
        Assert.assertEquals(11, vertex.getLocalParallelism());
        Assert.assertEquals(11, vertex2.getLocalParallelism());
    }

    @Test
    public void when_sourceHasNoPreferredLocalParallelism_then_lpMatchSource() {
        StreamSource streamFromProcessor = Sources.streamFromProcessor("src", ProcessorMetaSupplier.of(ProcessorSupplier.of(Processors.noopP())));
        Pipeline create = Pipeline.create();
        create.readFrom(streamFromProcessor).withTimestamps(obj -> {
            return 0L;
        }, 0L).writeTo(Sinks.noop());
        DAG dag = create.toDag();
        Vertex vertex = (Vertex) Objects.requireNonNull(dag.getVertex("src"));
        Vertex vertex2 = (Vertex) Objects.requireNonNull(dag.getVertex("src-add-timestamps"));
        Assert.assertEquals(vertex.determineLocalParallelism(-1), vertex2.determineLocalParallelism(-1));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2134715839:
                if (implMethodName.equals("lambda$createPlainSourceBuilder$d116759$1")) {
                    z = 5;
                    break;
                }
                break;
            case -1963339857:
                if (implMethodName.equals("lambda$when_sourceHasExplicitLocalParallelism_then_lpMatchSource$daf930c1$1")) {
                    z = 8;
                    break;
                }
                break;
            case -1249358039:
                if (implMethodName.equals("getKey")) {
                    z = true;
                    break;
                }
                break;
            case -402068935:
                if (implMethodName.equals("lambda$when_withTimestampsAndAddTimestamps_then_fail$daf930c1$1")) {
                    z = 2;
                    break;
                }
                break;
            case -359077656:
                if (implMethodName.equals("lambda$createTimestampedSourceBuilder$9236d13e$1")) {
                    z = false;
                    break;
                }
                break;
            case 1071131837:
                if (implMethodName.equals("lambda$createPlainSourceBuilder$9236d13e$1")) {
                    z = 7;
                    break;
                }
                break;
            case 1144284342:
                if (implMethodName.equals("lambda$createTimestampedSourceBuilder$d116759$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1456109550:
                if (implMethodName.equals("lambda$when_sourceHasPreferredLocalParallelism_then_lpMatchSource$daf930c1$1")) {
                    z = 6;
                    break;
                }
                break;
            case 1728859821:
                if (implMethodName.equals("lambda$when_sourceHasNoPreferredLocalParallelism_then_lpMatchSource$daf930c1$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/StreamSourceStageTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/core/Processor$Context;)[Z")) {
                    return context -> {
                        return new boolean[1];
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/map/EventJournalMapEvent") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getKey();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/StreamSourceStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Map$Entry;)J")) {
                    return entry -> {
                        return 0L;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/StreamSourceStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)J")) {
                    return obj -> {
                        return 0L;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/StreamSourceStageTest") && serializedLambda.getImplMethodSignature().equals("([ZLcom/hazelcast/jet/pipeline/SourceBuilder$TimestampedSourceBuffer;)V")) {
                    return (zArr, timestampedSourceBuffer) -> {
                        if (zArr[0]) {
                            return;
                        }
                        timestampedSourceBuffer.add(1, 1L);
                        timestampedSourceBuffer.add(2, 2L);
                        zArr[0] = true;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/StreamSourceStageTest") && serializedLambda.getImplMethodSignature().equals("([ZLcom/hazelcast/jet/pipeline/SourceBuilder$SourceBuffer;)V")) {
                    return (zArr2, sourceBuffer) -> {
                        if (zArr2[0]) {
                            return;
                        }
                        sourceBuffer.add(1);
                        sourceBuffer.add(2);
                        zArr2[0] = true;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/StreamSourceStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)J")) {
                    return obj2 -> {
                        return 0L;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/StreamSourceStageTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/core/Processor$Context;)[Z")) {
                    return context2 -> {
                        return new boolean[1];
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/StreamSourceStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)J")) {
                    return obj3 -> {
                        return 0L;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
