/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph.failover;

import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.runtime.executiongraph.failover.LogicalPipelinedRegionComputeUtil;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.topology.DefaultLogicalTopology;
import org.apache.flink.runtime.jobgraph.topology.LogicalVertex;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class LogicalPipelinedRegionComputeUtilTest {
    LogicalPipelinedRegionComputeUtilTest() {
    }

    @Test
    void testIsolatedVertices() {
        JobVertex v1 = new JobVertex("v1");
        JobVertex v2 = new JobVertex("v2");
        JobVertex v3 = new JobVertex("v3");
        Set<Set<LogicalVertex>> regions = LogicalPipelinedRegionComputeUtilTest.computePipelinedRegions(v1, v2, v3);
        LogicalPipelinedRegionComputeUtilTest.checkRegionSize(regions, 3, 1, 1, 1);
    }

    @Test
    void testVariousResultPartitionTypesBetweenVertices() {
        this.testThreeVerticesConnectSequentially(ResultPartitionType.BLOCKING, ResultPartitionType.PIPELINED, 2, 1, 2);
        this.testThreeVerticesConnectSequentially(ResultPartitionType.BLOCKING, ResultPartitionType.BLOCKING, 3, 1, 1, 1);
        this.testThreeVerticesConnectSequentially(ResultPartitionType.PIPELINED, ResultPartitionType.PIPELINED, 1, 3);
    }

    private void testThreeVerticesConnectSequentially(ResultPartitionType resultPartitionType1, ResultPartitionType resultPartitionType2, int numOfRegions, int ... regionSizes) {
        JobVertex v1 = new JobVertex("v1");
        JobVertex v2 = new JobVertex("v2");
        JobVertex v3 = new JobVertex("v3");
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, resultPartitionType1);
        v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, resultPartitionType2);
        Set<Set<LogicalVertex>> regions = LogicalPipelinedRegionComputeUtilTest.computePipelinedRegions(v1, v2, v3);
        LogicalPipelinedRegionComputeUtilTest.checkRegionSize(regions, numOfRegions, regionSizes);
    }

    @Test
    void testTwoInputsMergesIntoOne() {
        JobVertex v1 = new JobVertex("v1");
        JobVertex v2 = new JobVertex("v2");
        JobVertex v3 = new JobVertex("v3");
        JobVertex v4 = new JobVertex("v4");
        v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        Set<Set<LogicalVertex>> regions = LogicalPipelinedRegionComputeUtilTest.computePipelinedRegions(v1, v2, v3, v4);
        LogicalPipelinedRegionComputeUtilTest.checkRegionSize(regions, 2, 3, 1);
    }

    @Test
    void testOneInputSplitsIntoTwo() {
        JobVertex v1 = new JobVertex("v1");
        JobVertex v2 = new JobVertex("v2");
        JobVertex v3 = new JobVertex("v3");
        JobVertex v4 = new JobVertex("v4");
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        v3.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        Set<Set<LogicalVertex>> regions = LogicalPipelinedRegionComputeUtilTest.computePipelinedRegions(v1, v2, v3, v4);
        LogicalPipelinedRegionComputeUtilTest.checkRegionSize(regions, 2, 3, 1);
    }

    @Test
    void testDiamondWithMixedPipelinedAndBlockingEdges() {
        JobVertex v1 = new JobVertex("v1");
        JobVertex v2 = new JobVertex("v2");
        JobVertex v3 = new JobVertex("v3");
        JobVertex v4 = new JobVertex("v4");
        v2.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        v3.connectNewDataSetAsInput(v1, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        Set<Set<LogicalVertex>> regions = LogicalPipelinedRegionComputeUtilTest.computePipelinedRegions(v1, v2, v3, v4);
        LogicalPipelinedRegionComputeUtilTest.checkRegionSize(regions, 1, 4);
    }

    private static Set<Set<LogicalVertex>> computePipelinedRegions(JobVertex ... vertices) {
        DefaultLogicalTopology topology = DefaultLogicalTopology.fromTopologicallySortedJobVertices(Arrays.asList(vertices));
        return LogicalPipelinedRegionComputeUtil.computePipelinedRegions((Iterable)topology.getVertices());
    }

    private static void checkRegionSize(Set<Set<LogicalVertex>> regions, int numOfRegions, int ... sizes) {
        Assertions.assertThat(regions).hasSize(numOfRegions);
        Assertions.assertThat(regions.stream().map(Set::size).collect(Collectors.toList())).containsExactlyInAnyOrderElementsOf((Iterable)Arrays.stream(sizes).boxed().collect(Collectors.toList()));
    }
}

