package com.hazelcast.jet.core.metrics;

import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.function.Functions;
import com.hazelcast.jet.Job;
import com.hazelcast.jet.TestInClusterSupport;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.config.JobConfig;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.test.TestSources;
import com.hazelcast.test.annotation.ParallelJVMTest;
import com.hazelcast.test.annotation.QuickTest;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Locale;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({QuickTest.class, ParallelJVMTest.class})
/* loaded from: input_file:com/hazelcast/jet/core/metrics/JobMetrics_BatchTest.class */
public class JobMetrics_BatchTest extends TestInClusterSupport {
    static final JobConfig JOB_CONFIG_WITH_METRICS = new JobConfig().setStoreMetricsAfterJobCompletion(true);
    private static final String SOURCE_VERTEX = "items";
    private static final String FLAT_MAP_AND_FILTER_VERTEX = "fused(flat-map, filter)";
    private static final String GROUP_AND_AGGREGATE_PREPARE_VERTEX = "group-and-aggregate-prepare";
    private static final String GROUP_AND_AGGREGATE_VERTEX = "group-and-aggregate";
    private static final String SINK_VERTEX = "mapSink(counts)";
    private static final String COMMON_TEXT = "look at some common text here and uncommon text here";

    @Test
    public void when_jobCompleted_then_metricsExist() {
        assertMetrics(execute(createPipeline(), JOB_CONFIG_WITH_METRICS).getMetrics());
    }

    @Test
    public void when_storeMetricsAfterJobCompletionDisabled_then_metricsEmpty() {
        Assert.assertEquals("non-empty metrics", JobMetrics.empty(), execute(createPipeline(), new JobConfig()).getMetrics());
    }

    @Test
    public void when_memberAddedAfterJobFinished_then_metricsNotAffected() {
        Job execute = execute(createPipeline(), JOB_CONFIG_WITH_METRICS);
        HazelcastInstance newHazelcastInstance = factory.newHazelcastInstance(prepareConfig());
        try {
            assertClusterSizeEventually(3, hz());
            assertMetrics(execute.getMetrics());
            newHazelcastInstance.shutdown();
            assertClusterSizeEventually(2, hz());
        } catch (Throwable th) {
            newHazelcastInstance.shutdown();
            assertClusterSizeEventually(2, hz());
            throw th;
        }
    }

    @Test
    public void when_memberRemovedAfterJobFinished_then_metricsNotAffected() {
        Pipeline createPipeline = createPipeline();
        HazelcastInstance newHazelcastInstance = factory.newHazelcastInstance(prepareConfig());
        try {
            assertClusterSizeEventually(3, hz());
            Job execute = execute(createPipeline, JOB_CONFIG_WITH_METRICS);
            newHazelcastInstance.shutdown();
            assertClusterSizeEventually(2, hz());
            assertMetrics(execute.getMetrics());
        } catch (Throwable th) {
            newHazelcastInstance.shutdown();
            throw th;
        }
    }

    @Test
    public void when_twoDifferentPipelines_then_haveDifferentMetrics() {
        Pipeline createPipeline = createPipeline();
        Pipeline createPipeline2 = createPipeline("look at some common text here and here");
        Job newJob = hz().getJet().newJob(createPipeline, JOB_CONFIG_WITH_METRICS);
        Job newJob2 = hz().getJet().newJob(createPipeline2, JOB_CONFIG_WITH_METRICS);
        newJob.join();
        newJob2.join();
        Assert.assertNotEquals(newJob.getMetrics(), newJob2.getMetrics());
        assertMetrics(newJob.getMetrics());
        assertMetrics(newJob2.getMetrics(), "look at some common text here and here");
    }

    @Test
    public void when_twoDifferentJobsForTheSamePipeline_then_haveDifferentMetrics() {
        Pipeline createPipeline = createPipeline();
        Job newJob = hz().getJet().newJob(createPipeline, JOB_CONFIG_WITH_METRICS);
        Job newJob2 = hz().getJet().newJob(createPipeline, JOB_CONFIG_WITH_METRICS);
        newJob.join();
        newJob2.join();
        Assert.assertNotEquals(newJob.getMetrics(), newJob2.getMetrics());
        assertMetrics(newJob.getMetrics());
        assertMetrics(newJob2.getMetrics());
    }

    private Pipeline createPipeline() {
        return createPipeline(COMMON_TEXT);
    }

    private Pipeline createPipeline(String str) {
        Pipeline create = Pipeline.create();
        create.readFrom(TestSources.items(new String[]{str})).flatMap(str2 -> {
            return Traversers.traverseArray(str2.toLowerCase(Locale.ROOT).split("\\W+"));
        }).filter(str3 -> {
            return !str3.isEmpty();
        }).groupingKey(Functions.wholeItem()).aggregate(AggregateOperations.counting()).writeTo(Sinks.map("counts"));
        return create;
    }

    private void assertMetrics(JobMetrics jobMetrics) {
        assertMetrics(jobMetrics, COMMON_TEXT);
    }

    private void assertMetrics(JobMetrics jobMetrics, String str) {
        Assert.assertNotNull(jobMetrics);
        String[] split = str.split("\\W+");
        int length = split.length;
        int size = new HashSet(Arrays.asList(split)).size();
        Assert.assertEquals(1L, sumValueFor(jobMetrics, SOURCE_VERTEX, "emittedCount"));
        Assert.assertEquals(1L, sumValueFor(jobMetrics, FLAT_MAP_AND_FILTER_VERTEX, "receivedCount"));
        Assert.assertEquals(length, sumValueFor(jobMetrics, FLAT_MAP_AND_FILTER_VERTEX, "emittedCount"));
        Assert.assertEquals(length, sumValueFor(jobMetrics, GROUP_AND_AGGREGATE_PREPARE_VERTEX, "receivedCount"));
        Assert.assertEquals(size, sumValueFor(jobMetrics, GROUP_AND_AGGREGATE_PREPARE_VERTEX, "emittedCount"));
        Assert.assertEquals(size, sumValueFor(jobMetrics, GROUP_AND_AGGREGATE_VERTEX, "receivedCount"));
        Assert.assertEquals(size, sumValueFor(jobMetrics, GROUP_AND_AGGREGATE_VERTEX, "emittedCount"));
        Assert.assertEquals(size, sumValueFor(jobMetrics, SINK_VERTEX, "receivedCount"));
    }

    private long sumValueFor(JobMetrics jobMetrics, String str, String str2) {
        return jobMetrics.filter(MeasurementPredicates.tagValueEquals("vertex", str)).get(str2).stream().mapToLong((v0) -> {
            return v0.value();
        }).sum();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -846143385:
                if (implMethodName.equals("lambda$createPipeline$c4058a0b$1")) {
                    z = false;
                    break;
                }
                break;
            case -444627743:
                if (implMethodName.equals("lambda$createPipeline$d6d3dac0$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/PredicateEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("testEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/metrics/JobMetrics_BatchTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Z")) {
                    return str3 -> {
                        return !str3.isEmpty();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/core/metrics/JobMetrics_BatchTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Lcom/hazelcast/jet/Traverser;")) {
                    return str2 -> {
                        return Traversers.traverseArray(str2.toLowerCase(Locale.ROOT).split("\\W+"));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
