/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.benchmark;

import java.util.Iterator;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.storage.FileSystemCheckpointStorage;
import org.apache.flink.state.benchmark.RescalingBenchmark;
import org.apache.flink.state.benchmark.RescalingBenchmarkBuilder;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class RescalingBenchmarkTest
extends TestLogger {
    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Test
    public void testScalingOut() throws Exception {
        RescalingBenchmark<Integer> benchmark = new RescalingBenchmarkBuilder().setMaxParallelism(128).setParallelismBefore(1).setParallelismAfter(2).setManagedMemorySize(0x20000000).setCheckpointStorageAccess(new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI()).createCheckpointStorage(new JobID())).setStateBackend((StateBackend)new EmbeddedRocksDBStateBackend(true)).setStreamRecordGenerator(new IntegerRecordGenerator()).setStateProcessFunctionSupplier(() -> new TestKeyedFunction()).build();
        benchmark.setUp();
        benchmark.prepareStateForOperator(0);
        benchmark.rescale();
        benchmark.closeOperator();
        benchmark.tearDown();
    }

    @Test
    public void testScalingIn() throws Exception {
        RescalingBenchmark<Integer> benchmark = new RescalingBenchmarkBuilder().setMaxParallelism(128).setParallelismBefore(2).setParallelismAfter(1).setManagedMemorySize(0x20000000).setCheckpointStorageAccess(new FileSystemCheckpointStorage(temporaryFolder.newFolder().toURI()).createCheckpointStorage(new JobID())).setStateBackend((StateBackend)new EmbeddedRocksDBStateBackend(true)).setStreamRecordGenerator(new IntegerRecordGenerator()).setStateProcessFunctionSupplier(() -> new TestKeyedFunction()).build();
        benchmark.setUp();
        benchmark.prepareStateForOperator(0);
        benchmark.rescale();
        benchmark.closeOperator();
        benchmark.tearDown();
    }

    private static class TestKeyedFunction
    extends KeyedProcessFunction<Integer, Integer, Void> {
        private static final long serialVersionUID = 1L;
        private ValueState<Integer> randomState;

        private TestKeyedFunction() {
        }

        public void open(OpenContext openContext) throws Exception {
            super.open(openContext);
            this.randomState = this.getRuntimeContext().getState(new ValueStateDescriptor("RandomState", Integer.class));
        }

        public void processElement(Integer value, KeyedProcessFunction.Context ctx, Collector<Void> out) throws Exception {
            this.randomState.update((Object)ThreadLocalRandom.current().nextInt());
        }
    }

    private static class IntegerRecordGenerator
    implements RescalingBenchmark.StreamRecordGenerator<Integer> {
        private final int numberOfKeys = 1000;
        private int count = 0;

        private IntegerRecordGenerator() {
        }

        @Override
        public Iterator<StreamRecord<Integer>> generate() {
            return new Iterator<StreamRecord<Integer>>(){

                @Override
                public boolean hasNext() {
                    return count < 1000;
                }

                @Override
                public StreamRecord<Integer> next() {
                    count = count + 1;
                    return new StreamRecord((Object)ThreadLocalRandom.current().nextInt(), 0L);
                }
            };
        }

        @Override
        public TypeInformation getTypeInformation() {
            return BasicTypeInfo.INT_TYPE_INFO;
        }
    }
}

