/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.benchmark;

import java.io.IOException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.function.Supplier;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;

public class RescalingBenchmark<KEY> {
    private final int maxParallelism;
    private final int parallelismBefore;
    private final int parallelismAfter;
    private final int managedMemorySize;
    private final StateBackend stateBackend;
    private final CheckpointStorageAccess checkpointStorageAccess;
    private OperatorSubtaskState stateForRescaling;
    private OperatorSubtaskState stateForSubtask;
    private KeyedOneInputStreamOperatorTestHarness subtaskHarness;
    private final StreamRecordGenerator<KEY> streamRecordGenerator;
    private final Supplier<KeyedProcessFunction<KEY, KEY, Void>> stateProcessFunctionSupplier;

    public RescalingBenchmark(int parallelismBefore, int parallelismAfter, int maxParallelism, int managedMemorySize, StateBackend stateBackend, CheckpointStorageAccess checkpointStorageAccess, StreamRecordGenerator<KEY> streamRecordGenerator, Supplier<KeyedProcessFunction<KEY, KEY, Void>> stateProcessFunctionSupplier) {
        this.parallelismBefore = parallelismBefore;
        this.parallelismAfter = parallelismAfter;
        this.maxParallelism = maxParallelism;
        this.managedMemorySize = managedMemorySize;
        this.stateBackend = stateBackend;
        this.checkpointStorageAccess = checkpointStorageAccess;
        this.streamRecordGenerator = streamRecordGenerator;
        this.stateProcessFunctionSupplier = stateProcessFunctionSupplier;
    }

    public void setUp() throws Exception {
        this.stateForRescaling = this.prepareState();
    }

    public void tearDown() throws IOException {
        this.stateForRescaling.discardState();
    }

    public void rescale() throws Exception {
        this.subtaskHarness.initializeState(this.stateForSubtask);
    }

    public void closeOperator() throws Exception {
        this.subtaskHarness.close();
    }

    public void prepareStateForOperator(int subtaskIndex) throws Exception {
        this.stateForSubtask = AbstractStreamOperatorTestHarness.repartitionOperatorState((OperatorSubtaskState)this.stateForRescaling, (int)this.maxParallelism, (int)this.parallelismBefore, (int)this.parallelismAfter, (int)subtaskIndex);
        this.subtaskHarness = this.getTestHarness((KeySelector & Serializable)x -> x, this.maxParallelism, this.parallelismAfter, subtaskIndex);
        this.subtaskHarness.setStateBackend(this.stateBackend);
        this.subtaskHarness.setup();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private OperatorSubtaskState prepareState() throws Exception {
        KeyedOneInputStreamOperatorTestHarness[] harnessBefore = new KeyedOneInputStreamOperatorTestHarness[this.parallelismBefore];
        try {
            for (int i = 0; i < this.parallelismBefore; ++i) {
                harnessBefore[i] = this.getTestHarness((KeySelector & Serializable)x -> x, this.maxParallelism, this.parallelismBefore, i);
                harnessBefore[i].setStateBackend(this.stateBackend);
                harnessBefore[i].setup();
                harnessBefore[i].open();
            }
            Iterator<StreamRecord<KEY>> iterator = this.streamRecordGenerator.generate();
            while (iterator.hasNext()) {
                StreamRecord<KEY> next = iterator.next();
                int keyGroupIndex = KeyGroupRangeAssignment.assignToKeyGroup((Object)next.getValue(), (int)this.maxParallelism);
                int subtaskIndex = KeyGroupRangeAssignment.computeOperatorIndexForKeyGroup((int)this.maxParallelism, (int)this.parallelismBefore, (int)keyGroupIndex);
                harnessBefore[subtaskIndex].processElement(next);
            }
            OperatorSubtaskState[] subtaskState = new OperatorSubtaskState[this.parallelismBefore];
            for (int i = 0; i < this.parallelismBefore; ++i) {
                subtaskState[i] = harnessBefore[i].snapshot(0L, 1L);
            }
            OperatorSubtaskState operatorSubtaskState = AbstractStreamOperatorTestHarness.repackageState((OperatorSubtaskState[])subtaskState);
            return operatorSubtaskState;
        }
        finally {
            this.closeHarnessArray(harnessBefore);
        }
    }

    private KeyedOneInputStreamOperatorTestHarness<KEY, KEY, Void> getTestHarness(KeySelector<KEY, KEY> keySelector, int maxParallelism, int taskParallelism, int subtaskIdx) throws Exception {
        MockEnvironment env = new MockEnvironmentBuilder().setTaskName("RescalingTask").setManagedMemorySize((long)this.managedMemorySize).setMaxParallelism(maxParallelism).setParallelism(taskParallelism).setSubtaskIndex(subtaskIdx).build();
        env.setCheckpointStorageAccess(this.checkpointStorageAccess);
        return new KeyedOneInputStreamOperatorTestHarness((OneInputStreamOperator)new KeyedProcessOperator(this.stateProcessFunctionSupplier.get()), keySelector, this.streamRecordGenerator.getTypeInformation(), env);
    }

    private void closeHarnessArray(KeyedOneInputStreamOperatorTestHarness<?, ?, ?>[] harnessArr) throws Exception {
        for (KeyedOneInputStreamOperatorTestHarness<?, ?, ?> harness : harnessArr) {
            if (harness == null) continue;
            harness.close();
        }
    }

    public static interface StreamRecordGenerator<T> {
        public Iterator<StreamRecord<T>> generate();

        public TypeInformation<T> getTypeInformation();
    }
}

