package com.hazelcast.jet.impl.processor;

import com.hazelcast.collection.IList;
import com.hazelcast.config.Config;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.function.PredicateEx;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.SimpleTestInClusterSupport;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.config.EdgeConfig;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.config.ProcessingGuarantee;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import com.hazelcast.jet.core.EventTimePolicy;
import com.hazelcast.jet.core.JobStatus;
import com.hazelcast.jet.core.TestUtil;
import com.hazelcast.jet.core.Vertex;
import com.hazelcast.jet.core.WatermarkPolicy;
import com.hazelcast.jet.core.processor.SinkProcessors;
import com.hazelcast.jet.core.processor.SourceProcessors;
import com.hazelcast.jet.function.TriFunction;
import com.hazelcast.jet.pipeline.JournalInitialPosition;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.ServiceFactories;
import com.hazelcast.jet.pipeline.ServiceFactory;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.Sources;
import com.hazelcast.map.IMap;
import com.hazelcast.test.HazelcastParametrizedRunner;
import com.hazelcast.test.HazelcastSerialParametersRunnerFactory;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@Parameterized.UseParametersRunnerFactory(HazelcastSerialParametersRunnerFactory.class)
@RunWith(HazelcastParametrizedRunner.class)
@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/impl/processor/AsyncTransformUsingServiceP_IntegrationTest.class */
public class AsyncTransformUsingServiceP_IntegrationTest extends SimpleTestInClusterSupport {
    private static final int NUM_ITEMS = 100;

    @Parameterized.Parameter
    public boolean ordered;
    private IMap<Integer, Integer> journaledMap;
    private ServiceFactory<?, ExecutorService> serviceFactory;
    private IList<Object> sinkList;
    private JobConfig jobConfig;
    static final /* synthetic */ boolean $assertionsDisabled;

    @Parameterized.Parameters(name = "ordered={0}")
    public static Collection<Object> parameters() {
        return Arrays.asList(true, false);
    }

    @BeforeClass
    public static void beforeClass() {
        Config smallInstanceConfig = smallInstanceConfig();
        smallInstanceConfig.getMapConfig("journaledMap*").getEventJournalConfig().setEnabled(true).setCapacity(100000);
        initialize(1, smallInstanceConfig);
    }

    @Before
    public void before() {
        this.journaledMap = instance().getMap(randomMapName("journaledMap"));
        this.journaledMap.putAll((Map) IntStream.range(0, 100).boxed().collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return num2;
        })));
        this.sinkList = instance().getList(randomMapName("sinkList"));
        this.jobConfig = new JobConfig().setProcessingGuarantee(ProcessingGuarantee.EXACTLY_ONCE).setSnapshotIntervalMillis(0L);
        this.serviceFactory = ServiceFactories.sharedService(context -> {
            return Executors.newFixedThreadPool(8);
        }, (v0) -> {
            v0.shutdown();
        });
    }

    @Test
    public void stressTest_noRestart() {
        stressTestInt(false);
    }

    @Test
    public void stressTest_withRestart_graceful() {
        stressTestInt(true);
    }

    private void stressTestInt(boolean z) {
        this.journaledMap.putAll((Map) IntStream.range(100, 10000).boxed().collect(Collectors.toMap(num -> {
            return num;
        }, num2 -> {
            return num2;
        })));
        DAG dag = new DAG();
        Vertex newVertex = dag.newVertex("source", TestUtil.throttle(SourceProcessors.streamMapP(this.journaledMap.getName(), PredicateEx.alwaysTrue(), (v0) -> {
            return v0.getNewValue();
        }, JournalInitialPosition.START_FROM_OLDEST, EventTimePolicy.eventTimePolicy(obj -> {
            return ((Integer) obj).intValue();
        }, WatermarkPolicy.limitingLag(10L), 10L, 0L, 0L)), 5000L));
        BiFunctionEx transformNotPartitionedFn = transformNotPartitionedFn(num3 -> {
            return Traversers.traverseItems(new String[]{num3 + "-1", num3 + "-2", num3 + "-3", num3 + "-4", num3 + "-5"});
        });
        Vertex localParallelism = dag.newVertex("map", this.ordered ? AsyncTransformUsingServiceOrderedP.supplier(this.serviceFactory, 4, transformNotPartitionedFn) : AsyncTransformUsingServiceUnorderedP.supplier(this.serviceFactory, 4, transformNotPartitionedFn, FunctionEx.identity())).localParallelism(2);
        dag.edge(Edge.between(newVertex, localParallelism).setConfig(new EdgeConfig().setQueueSize(128))).edge(Edge.between(localParallelism, dag.newVertex("sink", SinkProcessors.writeListP(this.sinkList.getName()))).setConfig(new EdgeConfig().setQueueSize(10)));
        Job newJob = instance().getJet().newJob(dag, this.jobConfig);
        for (int i = 0; z && i < 5; i++) {
            assertJobStatusEventually(newJob, JobStatus.RUNNING);
            sleepMillis(100);
            newJob.restart();
        }
        assertResultEventually(num4 -> {
            return Stream.of((Object[]) new String[]{num4 + "-1", num4 + "-2", num4 + "-3", num4 + "-4", num4 + "-5"});
        }, 10000);
    }

    @Test
    public void test_pipelineApi_mapNotPartitioned() {
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.mapJournal(this.journaledMap, JournalInitialPosition.START_FROM_OLDEST, (v0) -> {
            return v0.getNewValue();
        }, PredicateEx.alwaysTrue())).withoutTimestamps().mapUsingServiceAsync(this.serviceFactory, transformNotPartitionedFn(num -> {
            return num + "-1";
        })).setLocalParallelism(2).writeTo(Sinks.list(this.sinkList));
        instance().getJet().newJob(create, this.jobConfig);
        assertResultEventually(num2 -> {
            return Stream.of(num2 + "-1");
        }, 100);
    }

    @Test
    public void test_pipelineApi_mapPartitioned() {
        Pipeline create = Pipeline.create();
        create.readFrom(Sources.mapJournal(this.journaledMap, JournalInitialPosition.START_FROM_OLDEST, (v0) -> {
            return v0.getNewValue();
        }, PredicateEx.alwaysTrue())).withoutTimestamps().groupingKey(num -> {
            return Integer.valueOf(num.intValue() % 10);
        }).mapUsingServiceAsync(this.serviceFactory, transformPartitionedFn(num2 -> {
            return num2 + "-1";
        })).setLocalParallelism(2).writeTo(Sinks.list(this.sinkList));
        instance().getJet().newJob(create, this.jobConfig);
        assertResultEventually(num3 -> {
            return Stream.of(num3 + "-1");
        }, 100);
    }

    private <R> BiFunctionEx<ExecutorService, Integer, CompletableFuture<R>> transformNotPartitionedFn(FunctionEx<Integer, R> functionEx) {
        return (executorService, num) -> {
            CompletableFuture completableFuture = new CompletableFuture();
            executorService.submit(() -> {
                sleepMillis(ThreadLocalRandom.current().nextInt(5));
                return Boolean.valueOf(completableFuture.complete(functionEx.apply(num)));
            });
            return completableFuture;
        };
    }

    private <R> TriFunction<ExecutorService, Integer, Integer, CompletableFuture<R>> transformPartitionedFn(FunctionEx<Integer, R> functionEx) {
        return (executorService, num, num2) -> {
            if (!$assertionsDisabled && num.intValue() != num2.intValue() % 10) {
                throw new AssertionError("item=" + num2 + ", key=" + num);
            }
            CompletableFuture completableFuture = new CompletableFuture();
            executorService.submit(() -> {
                sleepMillis(ThreadLocalRandom.current().nextInt(5));
                return Boolean.valueOf(completableFuture.complete(functionEx.apply(num2)));
            });
            return completableFuture;
        };
    }

    private void assertResultEventually(Function<Integer, Stream<? extends String>> function, int i) {
        String str = (String) IntStream.range(0, i).boxed().flatMap(function).sorted().collect(Collectors.joining("\n"));
        assertTrueEventually(() -> {
            Assert.assertEquals(str, this.sinkList.stream().map((v0) -> {
                return v0.toString();
            }).sorted().collect(Collectors.joining("\n")));
        }, 240L);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1553134898:
                if (implMethodName.equals("lambda$transformPartitionedFn$f81f6130$1")) {
                    z = false;
                    break;
                }
                break;
            case -1462497021:
                if (implMethodName.equals("lambda$transformNotPartitionedFn$ed23deca$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1092939931:
                if (implMethodName.equals("lambda$test_pipelineApi_mapPartitioned$a441ef18$1")) {
                    z = 9;
                    break;
                }
                break;
            case -1092939930:
                if (implMethodName.equals("lambda$test_pipelineApi_mapPartitioned$a441ef18$2")) {
                    z = 6;
                    break;
                }
                break;
            case -848611033:
                if (implMethodName.equals("getNewValue")) {
                    z = 5;
                    break;
                }
                break;
            case -169343402:
                if (implMethodName.equals("shutdown")) {
                    z = 7;
                    break;
                }
                break;
            case -107628179:
                if (implMethodName.equals("lambda$before$a441ef18$1")) {
                    z = 3;
                    break;
                }
                break;
            case 862226291:
                if (implMethodName.equals("lambda$stressTestInt$f6942e2b$1")) {
                    z = true;
                    break;
                }
                break;
            case 1734661038:
                if (implMethodName.equals("lambda$test_pipelineApi_mapNotPartitioned$a441ef18$1")) {
                    z = 4;
                    break;
                }
                break;
            case 2010143758:
                if (implMethodName.equals("lambda$stressTestInt$5131028e$1")) {
                    z = 8;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/jet/function/TriFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/AsyncTransformUsingServiceP_IntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/function/FunctionEx;Ljava/util/concurrent/ExecutorService;Ljava/lang/Integer;Ljava/lang/Integer;)Ljava/util/concurrent/CompletableFuture;")) {
                    FunctionEx functionEx = (FunctionEx) serializedLambda.getCapturedArg(0);
                    return (executorService, num, num2) -> {
                        if (!$assertionsDisabled && num.intValue() != num2.intValue() % 10) {
                            throw new AssertionError("item=" + num2 + ", key=" + num);
                        }
                        CompletableFuture completableFuture = new CompletableFuture();
                        executorService.submit(() -> {
                            sleepMillis(ThreadLocalRandom.current().nextInt(5));
                            return Boolean.valueOf(completableFuture.complete(functionEx.apply(num2)));
                        });
                        return completableFuture;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/AsyncTransformUsingServiceP_IntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)J")) {
                    return obj -> {
                        return ((Integer) obj).intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/BiFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/AsyncTransformUsingServiceP_IntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/function/FunctionEx;Ljava/util/concurrent/ExecutorService;Ljava/lang/Integer;)Ljava/util/concurrent/CompletableFuture;")) {
                    FunctionEx functionEx2 = (FunctionEx) serializedLambda.getCapturedArg(0);
                    return (executorService2, num3) -> {
                        CompletableFuture completableFuture = new CompletableFuture();
                        executorService2.submit(() -> {
                            sleepMillis(ThreadLocalRandom.current().nextInt(5));
                            return Boolean.valueOf(completableFuture.complete(functionEx2.apply(num3)));
                        });
                        return completableFuture;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/AsyncTransformUsingServiceP_IntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/hazelcast/jet/core/ProcessorSupplier$Context;)Ljava/util/concurrent/ExecutorService;")) {
                    return context -> {
                        return Executors.newFixedThreadPool(8);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/AsyncTransformUsingServiceP_IntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num4 -> {
                        return num4 + "-1";
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/map/EventJournalMapEvent") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getNewValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/map/EventJournalMapEvent") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getNewValue();
                    };
                }
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/map/EventJournalMapEvent") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0.getNewValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/AsyncTransformUsingServiceP_IntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num22 -> {
                        return num22 + "-1";
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ConsumerEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("acceptEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("java/util/concurrent/ExecutorService") && serializedLambda.getImplMethodSignature().equals("()V")) {
                    return (v0) -> {
                        v0.shutdown();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/AsyncTransformUsingServiceP_IntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Lcom/hazelcast/jet/Traverser;")) {
                    return num32 -> {
                        return Traversers.traverseItems(new String[]{num32 + "-1", num32 + "-2", num32 + "-3", num32 + "-4", num32 + "-5"});
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/impl/processor/AsyncTransformUsingServiceP_IntegrationTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num5 -> {
                        return Integer.valueOf(num5.intValue() % 10);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !AsyncTransformUsingServiceP_IntegrationTest.class.desiredAssertionStatus();
    }
}
