/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.filesystem;

import java.io.DataInputStream;
import java.io.InputStream;
import java.util.HashMap;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.runtime.state.KvStateSnapshot;
import org.apache.flink.runtime.state.filesystem.AbstractFileState;
import org.apache.flink.runtime.state.filesystem.FsHeapKvState;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;

public class FsHeapKvStateSnapshot<K, V>
extends AbstractFileState
implements KvStateSnapshot<K, V, FsStateBackend> {
    private static final long serialVersionUID = 1L;
    private final String keySerializerClassName;
    private final String valueSerializerClassName;

    public FsHeapKvStateSnapshot(TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, Path filePath) {
        super(filePath);
        this.keySerializerClassName = keySerializer.getClass().getName();
        this.valueSerializerClassName = valueSerializer.getClass().getName();
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public FsHeapKvState<K, V> restoreState(FsStateBackend stateBackend, TypeSerializer<K> keySerializer, TypeSerializer<V> valueSerializer, V defaultValue, ClassLoader classLoader) throws Exception {
        if (!keySerializer.getClass().getName().equals(this.keySerializerClassName)) throw new IllegalArgumentException("Cannot restore the state from the snapshot with the given serializers. State (K/V) was serialized with (" + this.valueSerializerClassName + "/" + this.keySerializerClassName + ")");
        if (!valueSerializer.getClass().getName().equals(this.valueSerializerClassName)) {
            throw new IllegalArgumentException("Cannot restore the state from the snapshot with the given serializers. State (K/V) was serialized with (" + this.valueSerializerClassName + "/" + this.keySerializerClassName + ")");
        }
        try (FSDataInputStream inStream = stateBackend.getFileSystem().open(this.getFilePath());){
            InputViewDataInputStreamWrapper inView = new InputViewDataInputStreamWrapper(new DataInputStream((InputStream)inStream));
            int numEntries = inView.readInt();
            HashMap<Object, Object> stateMap = new HashMap<Object, Object>(numEntries);
            for (int i = 0; i < numEntries; ++i) {
                Object key = keySerializer.deserialize((DataInputView)inView);
                Object value = valueSerializer.deserialize((DataInputView)inView);
                stateMap.put(key, value);
            }
            FsHeapKvState<K, V> fsHeapKvState = new FsHeapKvState<K, V>(keySerializer, valueSerializer, defaultValue, stateMap, stateBackend);
            return fsHeapKvState;
        }
        catch (Exception e) {
            throw new Exception("Failed to restore state from file system", e);
        }
    }
}

