/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.scheduler.adaptivebatch;

import java.util.Arrays;
import java.util.Collections;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.runtime.scheduler.adaptivebatch.BlockingResultInfo;
import org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismDecider;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Before;
import org.junit.Test;

public class DefaultVertexParallelismDeciderTest {
    private static final long BYTE_256_MB = 0x10000000L;
    private static final long BYTE_512_MB = 0x20000000L;
    private static final long BYTE_1_GB = 0x40000000L;
    private static final long BYTE_8_GB = 0x200000000L;
    private static final long BYTE_1_TB = 0x10000000000L;
    private static final int MAX_PARALLELISM = 100;
    private static final int MIN_PARALLELISM = 3;
    private static final int DEFAULT_SOURCE_PARALLELISM = 10;
    private static final long DATA_VOLUME_PER_TASK = 0x40000000L;
    private DefaultVertexParallelismDecider decider;

    @Before
    public void before() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM, 100);
        configuration.setInteger(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MIN_PARALLELISM, 3);
        configuration.set(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_AVG_DATA_VOLUME_PER_TASK, (Object)new MemorySize(0x40000000L));
        configuration.setInteger(JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DEFAULT_SOURCE_PARALLELISM, 10);
        this.decider = DefaultVertexParallelismDecider.from((Configuration)configuration);
    }

    @Test
    public void testNormalizedMaxAndMinParallelism() {
        MatcherAssert.assertThat((Object)this.decider.getMaxParallelism(), (Matcher)CoreMatchers.is((Object)64));
        MatcherAssert.assertThat((Object)this.decider.getMinParallelism(), (Matcher)CoreMatchers.is((Object)4));
    }

    @Test
    public void testSourceJobVertex() {
        int parallelism = this.decider.decideParallelismForVertex(Collections.emptyList());
        MatcherAssert.assertThat((Object)parallelism, (Matcher)CoreMatchers.is((Object)10));
    }

    @Test
    public void testNormalizeParallelismDownToPowerOf2() {
        BlockingResultInfo resultInfo1 = BlockingResultInfo.createFromBroadcastResult(Arrays.asList(0x10000000L));
        BlockingResultInfo resultInfo2 = BlockingResultInfo.createFromNonBroadcastResult(Arrays.asList(0x10000000L, 0x200000000L));
        int parallelism = this.decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2));
        MatcherAssert.assertThat((Object)parallelism, (Matcher)CoreMatchers.is((Object)8));
    }

    @Test
    public void testNormalizeParallelismUpToPowerOf2() {
        BlockingResultInfo resultInfo1 = BlockingResultInfo.createFromBroadcastResult(Arrays.asList(0x10000000L));
        BlockingResultInfo resultInfo2 = BlockingResultInfo.createFromNonBroadcastResult(Arrays.asList(0x40000000L, 0x200000000L));
        int parallelism = this.decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2));
        MatcherAssert.assertThat((Object)parallelism, (Matcher)CoreMatchers.is((Object)16));
    }

    @Test
    public void testInitiallyNormalizedParallelismIsLargerThanMaxParallelism() {
        BlockingResultInfo resultInfo1 = BlockingResultInfo.createFromBroadcastResult(Arrays.asList(0x10000000L));
        BlockingResultInfo resultInfo2 = BlockingResultInfo.createFromNonBroadcastResult(Arrays.asList(0x200000000L, 0x10000000000L));
        int parallelism = this.decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2));
        MatcherAssert.assertThat((Object)parallelism, (Matcher)CoreMatchers.is((Object)64));
    }

    @Test
    public void testInitiallyNormalizedParallelismIsSmallerThanMinParallelism() {
        BlockingResultInfo resultInfo1 = BlockingResultInfo.createFromBroadcastResult(Arrays.asList(0x10000000L));
        BlockingResultInfo resultInfo2 = BlockingResultInfo.createFromNonBroadcastResult(Arrays.asList(0x20000000L));
        int parallelism = this.decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2));
        MatcherAssert.assertThat((Object)parallelism, (Matcher)CoreMatchers.is((Object)4));
    }

    @Test
    public void testBroadcastRatioExceedsCapRatio() {
        BlockingResultInfo resultInfo1 = BlockingResultInfo.createFromBroadcastResult(Arrays.asList(0x40000000L));
        BlockingResultInfo resultInfo2 = BlockingResultInfo.createFromNonBroadcastResult(Arrays.asList(0x200000000L));
        int parallelism = this.decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2));
        MatcherAssert.assertThat((Object)parallelism, (Matcher)CoreMatchers.is((Object)16));
    }

    @Test
    public void testNonBroadcastBytesCanNotDividedEvenly() {
        BlockingResultInfo resultInfo1 = BlockingResultInfo.createFromBroadcastResult(Arrays.asList(0x20000000L));
        BlockingResultInfo resultInfo2 = BlockingResultInfo.createFromNonBroadcastResult(Arrays.asList(0x10000000L, 0x200000000L));
        int parallelism = this.decider.decideParallelismForVertex(Arrays.asList(resultInfo1, resultInfo2));
        MatcherAssert.assertThat((Object)parallelism, (Matcher)CoreMatchers.is((Object)16));
    }
}

