package com.hazelcast.jet.pipeline;

import com.hazelcast.client.test.executor.tasks.SelectAllMembers;
import com.hazelcast.client.test.executor.tasks.SelectNoMembers;
import com.hazelcast.client.test.executor.tasks.SerializedCounterCallable;
import com.hazelcast.function.FunctionEx;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.accumulator.LongAccumulator;
import com.hazelcast.jet.aggregate.AggregateOperation1;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.core.DAG;
import com.hazelcast.jet.core.Edge;
import java.lang.invoke.SerializedLambda;
import java.util.List;
import java.util.function.Function;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/hazelcast/jet/pipeline/RebalanceStreamStageTest.class */
public class RebalanceStreamStageTest extends PipelineStreamTestSupport {
    private static final AggregateOperation1<Integer, LongAccumulator, Long> SUMMING = AggregateOperations.summingLong(num -> {
        return num.intValue();
    });

    @Test
    public void when_rebalanceAndMap_then_dagEdgeDistributed() {
        List<Integer> sequence = sequence(this.itemCount);
        StreamStage<Integer> streamStageFromList = streamStageFromList(sequence);
        FunctionEx functionEx = num -> {
            return String.format("%04d-string", num);
        };
        streamStageFromList.rebalance().map(functionEx).writeTo(this.sink);
        Edge edge = (Edge) this.p.toDag().getInboundEdges("map").get(0);
        Assert.assertTrue("Rebalancing should make the edge distributed", edge.isDistributed());
        Assert.assertNull("Didn't rebalance by key, the edge must not be partitioned", edge.getPartitioner());
        execute();
        Assert.assertEquals(streamToString(sequence.stream(), functionEx), streamToString(sinkStreamOf(String.class), Function.identity()));
    }

    @Test
    public void when_rebalanceByKeyAndMap_then_dagEdgePartitionedDistributed() {
        StreamStage<Integer> streamStageFromList = streamStageFromList(sequence(this.itemCount));
        streamStageFromList.rebalance(num -> {
            return num;
        }).map(num2 -> {
            return String.format("%04d-string", num2);
        }).writeTo(this.sink);
        Edge edge = (Edge) this.p.toDag().getInboundEdges("map").get(0);
        Assert.assertTrue("Rebalancing should make the edge distributed", edge.isDistributed());
        Assert.assertNotNull("Rebalanced by key, the edge must be partitioned", edge.getPartitioner());
    }

    @Test
    public void when_peekAndRebalanceAndMap_then_dagEdgeDistributed() {
        List<Integer> sequence = sequence(this.itemCount);
        StreamStage<Integer> streamStageFromList = streamStageFromList(sequence);
        FunctionEx functionEx = num -> {
            return String.format("%04d-string", num);
        };
        streamStageFromList.peek().rebalance().map(functionEx).writeTo(this.sink);
        Edge edge = (Edge) this.p.toDag().getInboundEdges("map").get(0);
        Assert.assertTrue("Rebalancing should make the edge distributed", edge.isDistributed());
        Assert.assertNull("Didn't rebalance by key, the edge must not be partitioned", edge.getPartitioner());
        execute();
        Assert.assertEquals(streamToString(sequence.stream(), functionEx), streamToString(sinkStreamOf(String.class), Function.identity()));
    }

    @Test
    public void when_peekAndRebalanceByKeyAndMap_then_dagEdgePartitionedDistributed() {
        StreamStage<Integer> streamStageFromList = streamStageFromList(sequence(this.itemCount));
        streamStageFromList.peek().rebalance(num -> {
            return num;
        }).map(num2 -> {
            return String.format("%04d-string", num2);
        }).writeTo(this.sink);
        Edge edge = (Edge) this.p.toDag().getInboundEdges("map").get(0);
        Assert.assertTrue("Rebalancing should make the edge distributed", edge.isDistributed());
        Assert.assertNotNull("Rebalanced by key, the edge must be partitioned", edge.getPartitioner());
    }

    @Test(expected = JetException.class)
    public void when_rebalanceAndPeekAndMap_then_dagEdgeDistributed() {
        StreamStage<Integer> streamStageFromList = streamStageFromList(sequence(this.itemCount));
        streamStageFromList.rebalance().peek().map(num -> {
            return String.format("%04d-string", num);
        });
    }

    @Test(expected = JetException.class)
    public void when_rebalanceByKeyAndPeekAndMap_then_dagEdgePartitionedDistributed() {
        StreamStage<Integer> streamStageFromList = streamStageFromList(sequence(this.itemCount));
        streamStageFromList.rebalance(num -> {
            return num;
        }).peek().map(num2 -> {
            return String.format("%04d-string", num2);
        });
    }

    @Test
    public void when_rebalanceAndWindowAggregate_then_unicastDistributedEdgeAndTwoStageAggregation() {
        streamStageFromList(sequence(this.itemCount)).rebalance().window(WindowDefinition.tumbling(1L)).aggregate(SUMMING).writeTo(this.sink);
        assertTwoStageAggregation(false);
    }

    @Test
    public void when_rebalanceByKeyAndWindowAggregate_then_distributedPartitionedEdgeAndTwoStageAggregation() {
        streamStageFromList(sequence(this.itemCount)).rebalance(num -> {
            return Integer.valueOf(2 * num.intValue());
        }).window(WindowDefinition.tumbling(1L)).aggregate(SUMMING).writeTo(this.sink);
        assertTwoStageAggregation(true);
    }

    @Test
    public void when_rebalanceAndWindowGroupAggregate_then_singleStageAggregation() {
        StreamStage<Integer> streamStageFromList = streamStageFromList(sequence(this.itemCount));
        streamStageFromList.rebalance().window(WindowDefinition.tumbling(1L)).groupingKey(num -> {
            return Integer.valueOf(num.intValue() % 5);
        }).aggregate(SUMMING).writeTo(this.sink);
        assertSingleStageAggregation();
    }

    @Test
    public void when_rebalanceByKeyAndWindowGroupAggregate_then_singleStageAggregation() {
        StreamStage<Integer> streamStageFromList = streamStageFromList(sequence(this.itemCount));
        FunctionEx functionEx = num -> {
            return Integer.valueOf(num.intValue() % 3);
        };
        streamStageFromList.rebalance(functionEx).window(WindowDefinition.tumbling(1L)).groupingKey(num2 -> {
            return Integer.valueOf(num2.intValue() % 5);
        }).aggregate(SUMMING).writeTo(this.sink);
        assertSingleStageAggregation();
    }

    private void assertSingleStageAggregation() {
        DAG dag = this.p.toDag();
        try {
            Assert.assertTrue("Outbound edge after rebalancing must be distributed", ((Edge) dag.getOutboundEdges("add-timestamps").get(0)).isDistributed());
            Assert.assertEquals("Aggregation after rebalancing must be single-stage", 0L, dag.getOutboundEdges(((Edge) dag.getOutboundEdges(r0.getDestName()).get(0)).getDestName()).size());
        } catch (AssertionError e) {
            System.err.println(dag.toDotString());
            throw e;
        }
    }

    private void assertTwoStageAggregation(boolean z) {
        DAG dag = this.p.toDag();
        try {
            Edge edge = (Edge) dag.getOutboundEdges("add-timestamps").get(0);
            Assert.assertEquals(z ? "Edge to aggregation after rebalancing by key must be partitioned" : "Rebalanced edge to global aggregation must be unicast", Boolean.valueOf(z), Boolean.valueOf(edge.getPartitioner() != null));
            Assert.assertTrue("Outbound edge after rebalancing must be distributed", edge.isDistributed());
            String destName = edge.getDestName();
            Assert.assertEquals("Aggregation must be two-stage;", "sliding-window-prepare", destName.substring(destName.length() - "sliding-window-prepare".length()));
            Edge edge2 = (Edge) dag.getOutboundEdges(destName).get(0);
            Assert.assertTrue("Internal aggregation edge must be distributed", edge2.isDistributed());
            Assert.assertNotNull("Internal aggregation edge must be partitioned", edge2.getPartitioner());
        } catch (AssertionError e) {
            System.err.println(dag.toDotString());
            throw e;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1718146781:
                if (implMethodName.equals("lambda$when_peekAndRebalanceAndMap_then_dagEdgeDistributed$785790e6$1")) {
                    z = 6;
                    break;
                }
                break;
            case -1433617738:
                if (implMethodName.equals("lambda$when_rebalanceByKeyAndWindowGroupAggregate_then_singleStageAggregation$52c622ae$1")) {
                    z = 9;
                    break;
                }
                break;
            case -233962416:
                if (implMethodName.equals("lambda$when_rebalanceByKeyAndWindowGroupAggregate_then_singleStageAggregation$77101d84$1")) {
                    z = 12;
                    break;
                }
                break;
            case -210641270:
                if (implMethodName.equals("lambda$when_rebalanceByKeyAndMap_then_dagEdgePartitionedDistributed$785790e6$1")) {
                    z = 10;
                    break;
                }
                break;
            case 142383239:
                if (implMethodName.equals("lambda$when_rebalanceAndPeekAndMap_then_dagEdgeDistributed$785790e6$1")) {
                    z = 11;
                    break;
                }
                break;
            case 388913911:
                if (implMethodName.equals("lambda$when_rebalanceByKeyAndMap_then_dagEdgePartitionedDistributed$688b6e52$1")) {
                    z = 8;
                    break;
                }
                break;
            case 393514032:
                if (implMethodName.equals("lambda$when_rebalanceAndWindowGroupAggregate_then_singleStageAggregation$52c622ae$1")) {
                    z = 3;
                    break;
                }
                break;
            case 397363500:
                if (implMethodName.equals("lambda$when_rebalanceByKeyAndWindowAggregate_then_distributedPartitionedEdgeAndTwoStageAggregation$9d52c840$1")) {
                    z = true;
                    break;
                }
                break;
            case 1115624086:
                if (implMethodName.equals("lambda$when_peekAndRebalanceByKeyAndMap_then_dagEdgePartitionedDistributed$785790e6$1")) {
                    z = 4;
                    break;
                }
                break;
            case 1122856526:
                if (implMethodName.equals("lambda$when_rebalanceByKeyAndPeekAndMap_then_dagEdgePartitionedDistributed$785790e6$1")) {
                    z = false;
                    break;
                }
                break;
            case 1345143113:
                if (implMethodName.equals("lambda$static$3498aefe$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1399348503:
                if (implMethodName.equals("lambda$when_rebalanceAndMap_then_dagEdgeDistributed$785790e6$1")) {
                    z = 13;
                    break;
                }
                break;
            case 1715179267:
                if (implMethodName.equals("lambda$when_peekAndRebalanceByKeyAndMap_then_dagEdgePartitionedDistributed$688b6e52$1")) {
                    z = 7;
                    break;
                }
                break;
            case 1722411707:
                if (implMethodName.equals("lambda$when_rebalanceByKeyAndPeekAndMap_then_dagEdgePartitionedDistributed$688b6e52$1")) {
                    z = 5;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num2 -> {
                        return String.format("%04d-string", num2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num -> {
                        return Integer.valueOf(2 * num.intValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/ToLongFunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyAsLongEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)J") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)J")) {
                    return num3 -> {
                        return num3.intValue();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num4 -> {
                        return Integer.valueOf(num4.intValue() % 5);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num22 -> {
                        return String.format("%04d-string", num22);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num5 -> {
                        return num5;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num6 -> {
                        return String.format("%04d-string", num6);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num7 -> {
                        return num7;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num8 -> {
                        return num8;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num23 -> {
                        return Integer.valueOf(num23.intValue() % 5);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num24 -> {
                        return String.format("%04d-string", num24);
                    };
                }
                break;
            case SelectAllMembers.CLASS_ID /* 11 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num9 -> {
                        return String.format("%04d-string", num9);
                    };
                }
                break;
            case SelectNoMembers.CLASS_ID /* 12 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/Integer;")) {
                    return num10 -> {
                        return Integer.valueOf(num10.intValue() % 3);
                    };
                }
                break;
            case SerializedCounterCallable.CLASS_ID /* 13 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("com/hazelcast/function/FunctionEx") && serializedLambda.getFunctionalInterfaceMethodName().equals("applyEx") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/hazelcast/jet/pipeline/RebalanceStreamStageTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;)Ljava/lang/String;")) {
                    return num11 -> {
                        return String.format("%04d-string", num11);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
