/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.RunnableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.StateObjectCollection;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.HeapBroadcastState;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupedInternalPriorityQueue;
import org.apache.flink.runtime.state.KeyedStateBackendParametersImpl;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateBackendParametersImpl;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.PartitionableListState;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SharedStateRegistryImpl;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.testutils.statemigration.TestType;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StateMigrationException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Assumptions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.io.TempDir;

public abstract class StateBackendMigrationTestBase<B extends StateBackend> {
    @TempDir
    protected Path tempFolder;
    private CheckpointStorageLocation checkpointStorageLocation;
    private MockEnvironment env;

    protected abstract B getStateBackend() throws Exception;

    protected CheckpointStorage getCheckpointStorage() throws Exception {
        return new JobManagerCheckpointStorage();
    }

    protected boolean supportsKeySerializerCheck() {
        return true;
    }

    @BeforeEach
    void before() {
        this.env = MockEnvironment.builder().build();
    }

    @AfterEach
    void after() {
        IOUtils.closeQuietly((AutoCloseable)this.env);
    }

    @TestTemplate
    void testKeyedValueStateMigration() throws Exception {
        String stateName = "test-name";
        this.testKeyedValueStateUpgrade((ValueStateDescriptor<TestType>)new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ValueStateDescriptor<TestType>)new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.V2TestTypeSerializer()));
    }

    @TestTemplate
    void testKeyedValueStateSerializerReconfiguration() throws Exception {
        String stateName = "test-name";
        this.testKeyedValueStateUpgrade((ValueStateDescriptor<TestType>)new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ValueStateDescriptor<TestType>)new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @TestTemplate
    void testKeyedValueStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
        String stateName = "test-name";
        Assertions.assertThatThrownBy(() -> this.testKeyedValueStateUpgrade((ValueStateDescriptor<TestType>)new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ValueStateDescriptor<TestType>)new ValueStateDescriptor("test-name", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer()))).satisfiesAnyOf(new ThrowingConsumer[]{e -> Assertions.assertThat((Throwable)e).isInstanceOf(StateMigrationException.class), e -> Assertions.assertThat((Throwable)e).hasCauseInstanceOf(StateMigrationException.class)});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testKeyedValueStateUpgrade(ValueStateDescriptor<TestType> initialAccessDescriptor, ValueStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ValueState valueState = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, initialAccessDescriptor);
            backend.setCurrentKey((Object)1);
            valueState.update((Object)new TestType("foo", 1456));
            backend.setCurrentKey((Object)2);
            valueState.update((Object)new TestType("bar", 478));
            backend.setCurrentKey((Object)3);
            valueState.update((Object)new TestType("hello", 189));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            valueState = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, newAccessDescriptorAfterRestore);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((Object)((Object)((TestType)((Object)valueState.value())))).isEqualTo((Object)new TestType("foo", 1456));
            valueState.update((Object)new TestType("newValue1", 751));
            backend.setCurrentKey((Object)2);
            Assertions.assertThat((Object)((Object)((TestType)((Object)valueState.value())))).isEqualTo((Object)new TestType("bar", 478));
            valueState.update((Object)new TestType("newValue2", 167));
            backend.setCurrentKey((Object)3);
            Assertions.assertThat((Object)((Object)((TestType)((Object)valueState.value())))).isEqualTo((Object)new TestType("hello", 189));
            valueState.update((Object)new TestType("newValue3", 444));
            snapshot = this.runSnapshot(backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    @TestTemplate
    void testKeyedListStateMigration() throws Exception {
        String stateName = "test-name";
        this.testKeyedListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("test-name", (TypeSerializer)new TestType.V2TestTypeSerializer()));
    }

    @TestTemplate
    void testKeyedListStateSerializerReconfiguration() throws Exception {
        String stateName = "test-name";
        this.testKeyedListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("test-name", (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @TestTemplate
    void testKeyedListStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
        String stateName = "test-name";
        Assertions.assertThatThrownBy(() -> this.testKeyedListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("test-name", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("test-name", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer()))).satisfiesAnyOf(new ThrowingConsumer[]{e -> Assertions.assertThat((Throwable)e).isInstanceOf(StateMigrationException.class), e -> Assertions.assertThat((Throwable)e).hasCauseInstanceOf(StateMigrationException.class)});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testKeyedListStateUpgrade(ListStateDescriptor<TestType> initialAccessDescriptor, ListStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            ListState listState = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, initialAccessDescriptor);
            backend.setCurrentKey((Object)1);
            listState.add((Object)new TestType("key-1", 1));
            listState.add((Object)new TestType("key-1", 2));
            listState.add((Object)new TestType("key-1", 3));
            backend.setCurrentKey((Object)2);
            listState.add((Object)new TestType("key-2", 1));
            backend.setCurrentKey((Object)3);
            listState.add((Object)new TestType("key-3", 1));
            listState.add((Object)new TestType("key-3", 2));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            listState = (ListState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, newAccessDescriptorAfterRestore);
            backend.setCurrentKey((Object)1);
            Iterator iterable1 = ((Iterable)listState.get()).iterator();
            Assertions.assertThat((Object)((Object)((TestType)((Object)iterable1.next())))).isEqualTo((Object)new TestType("key-1", 1));
            Assertions.assertThat((Object)((Object)((TestType)((Object)iterable1.next())))).isEqualTo((Object)new TestType("key-1", 2));
            Assertions.assertThat((Object)((Object)((TestType)((Object)iterable1.next())))).isEqualTo((Object)new TestType("key-1", 3));
            Assertions.assertThat(iterable1).isExhausted();
            listState.add((Object)new TestType("new-key-1", 123));
            backend.setCurrentKey((Object)2);
            Iterator iterable2 = ((Iterable)listState.get()).iterator();
            Assertions.assertThat((Object)((Object)((TestType)((Object)iterable2.next())))).isEqualTo((Object)new TestType("key-2", 1));
            Assertions.assertThat(iterable2).isExhausted();
            listState.add((Object)new TestType("new-key-2", 456));
            backend.setCurrentKey((Object)3);
            Iterator iterable3 = ((Iterable)listState.get()).iterator();
            Assertions.assertThat((Object)((Object)((TestType)((Object)iterable3.next())))).isEqualTo((Object)new TestType("key-3", 1));
            Assertions.assertThat((Object)((Object)((TestType)((Object)iterable3.next())))).isEqualTo((Object)new TestType("key-3", 2));
            Assertions.assertThat(iterable3).isExhausted();
            listState.add((Object)new TestType("new-key-3", 777));
            snapshot = this.runSnapshot(backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    @TestTemplate
    void testKeyedMapStateAsIs() throws Exception {
        String stateName = "test-name";
        this.testKeyedMapStateUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()));
    }

    @TestTemplate
    void testKeyedMapStateStateMigration() throws Exception {
        String stateName = "test-name";
        this.testKeyedMapStateUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V2TestTypeSerializer()));
    }

    @TestTemplate
    void testKeyedMapStateSerializerReconfiguration() throws Exception {
        String stateName = "test-name";
        this.testKeyedMapStateUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @TestTemplate
    void testKeyedMapStateRegistrationFailsIfNewStateSerializerIsIncompatible() {
        String stateName = "test-name";
        Assertions.assertThatThrownBy(() -> this.testKeyedMapStateUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("test-name", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.IncompatibleTestTypeSerializer()))).satisfiesAnyOf(new ThrowingConsumer[]{e -> Assertions.assertThat((Throwable)e).isInstanceOf(StateMigrationException.class), e -> Assertions.assertThat((Throwable)e).hasCauseInstanceOf(StateMigrationException.class)});
    }

    private Iterator<Map.Entry<Integer, TestType>> sortedIterator(Iterator<Map.Entry<Integer, TestType>> iterator) {
        TreeSet<Map.Entry> set = new TreeSet<Map.Entry>(Comparator.comparing(Map.Entry::getKey));
        iterator.forEachRemaining(set::add);
        return set.iterator();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testKeyedMapStateUpgrade(MapStateDescriptor<Integer, TestType> initialAccessDescriptor, MapStateDescriptor<Integer, TestType> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            MapState mapState = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, initialAccessDescriptor);
            backend.setCurrentKey((Object)1);
            mapState.put((Object)1, (Object)new TestType("key-1", 1));
            mapState.put((Object)2, (Object)new TestType("key-1", 2));
            mapState.put((Object)3, (Object)new TestType("key-1", 3));
            backend.setCurrentKey((Object)2);
            mapState.put((Object)1, (Object)new TestType("key-2", 1));
            backend.setCurrentKey((Object)3);
            mapState.put((Object)1, (Object)new TestType("key-3", 1));
            mapState.put((Object)2, (Object)new TestType("key-3", 2));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            mapState = (MapState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, newAccessDescriptorAfterRestore);
            backend.setCurrentKey((Object)1);
            Iterator<Map.Entry<Integer, TestType>> iterable1 = this.sortedIterator(mapState.iterator());
            Map.Entry<Integer, TestType> actual = iterable1.next();
            Assertions.assertThat((Integer)actual.getKey()).isOne();
            Assertions.assertThat((Object)((Object)actual.getValue())).isEqualTo((Object)new TestType("key-1", 1));
            actual = iterable1.next();
            Assertions.assertThat((Integer)actual.getKey()).isEqualTo(2);
            Assertions.assertThat((Object)((Object)actual.getValue())).isEqualTo((Object)new TestType("key-1", 2));
            actual = iterable1.next();
            Assertions.assertThat((Integer)actual.getKey()).isEqualTo(3);
            Assertions.assertThat((Object)((Object)actual.getValue())).isEqualTo((Object)new TestType("key-1", 3));
            Assertions.assertThat(iterable1).isExhausted();
            mapState.put((Object)123, (Object)new TestType("new-key-1", 123));
            backend.setCurrentKey((Object)2);
            Iterator iterable2 = mapState.iterator();
            actual = (Map.Entry<Integer, TestType>)iterable2.next();
            Assertions.assertThat((Integer)((Integer)actual.getKey())).isOne();
            Assertions.assertThat((Object)((Object)((TestType)((Object)actual.getValue())))).isEqualTo((Object)new TestType("key-2", 1));
            Assertions.assertThat((Iterator)iterable2).isExhausted();
            mapState.put((Object)456, (Object)new TestType("new-key-2", 456));
            backend.setCurrentKey((Object)3);
            Iterator<Map.Entry<Integer, TestType>> iterable3 = this.sortedIterator(mapState.iterator());
            actual = iterable3.next();
            Assertions.assertThat((Integer)actual.getKey()).isOne();
            Assertions.assertThat((Object)((Object)actual.getValue())).isEqualTo((Object)new TestType("key-3", 1));
            actual = iterable3.next();
            Assertions.assertThat((Integer)actual.getKey()).isEqualTo(2);
            Assertions.assertThat((Object)((Object)actual.getValue())).isEqualTo((Object)new TestType("key-3", 2));
            Assertions.assertThat(iterable3).isExhausted();
            mapState.put((Object)777, (Object)new TestType("new-key-3", 777));
            snapshot = this.runSnapshot(backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @TestTemplate
    void testPriorityQueueStateCreationFailsIfNewSerializerIsNotCompatible() throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        try {
            KeyGroupedInternalPriorityQueue internalPriorityQueue = backend.create("testPriorityQueue", (TypeSerializer)new TestType.V1TestTypeSerializer());
            internalPriorityQueue.add((Object)new TestType("key-1", 123));
            internalPriorityQueue.add((Object)new TestType("key-2", 346));
            internalPriorityQueue.add((Object)new TestType("key-1", 777));
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            CheckpointableKeyedStateBackend restoredBackend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            Assertions.assertThatThrownBy(() -> restoredBackend.create("testPriorityQueue", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer())).hasCauseInstanceOf(StateMigrationException.class);
        }
        finally {
            backend.dispose();
        }
    }

    @TestTemplate
    void testStateBackendRestoreFailsIfNewKeySerializerRequiresMigration() throws Exception {
        Assumptions.assumeThat((boolean)this.supportsKeySerializerCheck()).isTrue();
        Assertions.assertThatThrownBy(() -> this.testKeySerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.V2TestTypeSerializer())).hasCauseInstanceOf(StateMigrationException.class);
    }

    @TestTemplate
    void testStateBackendRestoreSucceedsIfNewKeySerializerRequiresReconfiguration() throws Exception {
        Assumptions.assumeThat((boolean)this.supportsKeySerializerCheck()).isTrue();
        this.testKeySerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.ReconfigurationRequiringTestTypeSerializer());
    }

    @TestTemplate
    void testStateBackendRestoreFailsIfNewKeySerializerIsIncompatible() throws Exception {
        Assumptions.assumeThat((boolean)this.supportsKeySerializerCheck()).isTrue();
        Assertions.assertThatThrownBy(() -> this.testKeySerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.IncompatibleTestTypeSerializer())).hasCauseInstanceOf(StateMigrationException.class);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testKeySerializerUpgrade(TypeSerializer<TestType> initialKeySerializer, TypeSerializer<TestType> newKeySerializer) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend<TestType> backend = this.createKeyedBackend(initialKeySerializer);
        String stateName = "test-name";
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("test-name", Integer.class);
            ValueState valueState = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)new TestType("foo", 123));
            valueState.update((Object)1);
            backend.setCurrentKey((Object)new TestType("bar", 456));
            valueState.update((Object)5);
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend(newKeySerializer, snapshot);
            valueState = (ValueState)backend.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)CustomVoidNamespaceSerializer.INSTANCE, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)new TestType("foo", 123));
            Assertions.assertThat((int)((Integer)valueState.value())).isOne();
            backend.setCurrentKey((Object)new TestType("bar", 456));
            Assertions.assertThat((int)((Integer)valueState.value())).isEqualTo(5);
            snapshot = this.runSnapshot(backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    @TestTemplate
    void testKeyedStateRegistrationFailsIfNewNamespaceSerializerRequiresMigration() throws Exception {
        Assertions.assertThatThrownBy(() -> this.testNamespaceSerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.V2TestTypeSerializer())).satisfiesAnyOf(new ThrowingConsumer[]{e -> Assertions.assertThat((Throwable)e).isInstanceOf(StateMigrationException.class), e -> Assertions.assertThat((Throwable)e).hasCauseInstanceOf(StateMigrationException.class)});
    }

    @TestTemplate
    void testKeyedStateRegistrationSucceedsIfNewNamespaceSerializerRequiresReconfiguration() throws Exception {
        this.testNamespaceSerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.ReconfigurationRequiringTestTypeSerializer());
    }

    @TestTemplate
    void testKeyedStateRegistrationFailsIfNewNamespaceSerializerIsIncompatible() throws Exception {
        Assertions.assertThatThrownBy(() -> this.testNamespaceSerializerUpgrade(new TestType.V1TestTypeSerializer(), new TestType.IncompatibleTestTypeSerializer())).satisfiesAnyOf(new ThrowingConsumer[]{e -> Assertions.assertThat((Throwable)e).isInstanceOf(StateMigrationException.class), e -> Assertions.assertThat((Throwable)e).hasCauseInstanceOf(StateMigrationException.class)});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testNamespaceSerializerUpgrade(TypeSerializer<TestType> initialNamespaceSerializer, TypeSerializer<TestType> newNamespaceSerializerAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        SharedStateRegistryImpl sharedStateRegistry = new SharedStateRegistryImpl();
        CheckpointableKeyedStateBackend backend = this.createKeyedBackend((TypeSerializer)IntSerializer.INSTANCE);
        String stateName = "test-name";
        try {
            ValueStateDescriptor kvId = new ValueStateDescriptor("test-name", Integer.class);
            ValueState valueState = (ValueState)backend.getPartitionedState((Object)new TestType("namespace", 123), initialNamespaceSerializer, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            valueState.update((Object)10);
            backend.setCurrentKey((Object)5);
            valueState.update((Object)50);
            KeyedStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            backend.dispose();
            backend = this.restoreKeyedBackend((TypeSerializer)IntSerializer.INSTANCE, snapshot);
            valueState = (ValueState)backend.getPartitionedState((Object)new TestType("namespace", 123), newNamespaceSerializerAfterRestore, (StateDescriptor)kvId);
            backend.setCurrentKey((Object)1);
            Assertions.assertThat((int)((Integer)valueState.value())).isEqualTo(10);
            valueState.update((Object)10);
            backend.setCurrentKey((Object)5);
            Assertions.assertThat((int)((Integer)valueState.value())).isEqualTo(50);
            snapshot = this.runSnapshot(backend.snapshot(2L, 3L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()), (SharedStateRegistry)sharedStateRegistry);
            snapshot.discardState();
        }
        finally {
            backend.dispose();
        }
    }

    @TestTemplate
    void testOperatorParitionableListStateMigration() throws Exception {
        String stateName = "partitionable-list-state";
        this.testOperatorPartitionableListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.V2TestTypeSerializer()));
    }

    @TestTemplate
    void testOperatorParitionableListStateSerializerReconfiguration() throws Exception {
        String stateName = "partitionable-list-state";
        this.testOperatorPartitionableListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @TestTemplate
    void testOperatorParitionableListStateRegistrationFailsIfNewSerializerIsIncompatible() throws Exception {
        String stateName = "partitionable-list-state";
        Assertions.assertThatThrownBy(() -> this.testOperatorPartitionableListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("partitionable-list-state", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer()))).satisfiesAnyOf(new ThrowingConsumer[]{e -> Assertions.assertThat((Throwable)e).isInstanceOf(StateMigrationException.class), e -> Assertions.assertThat((Throwable)e).hasCauseInstanceOf(StateMigrationException.class)});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testOperatorPartitionableListStateUpgrade(ListStateDescriptor<TestType> initialAccessDescriptor, ListStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        try {
            ListState state = backend.getListState(initialAccessDescriptor);
            state.add((Object)new TestType("foo", 13));
            state.add((Object)new TestType("bar", 278));
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            state = backend.getListState(newAccessDescriptorAfterRestore);
            TypeSerializer internalListCopySerializer = ((PartitionableListState)state).getInternalListCopySerializer().getElementSerializer();
            TypeSerializer previousSerializer = initialAccessDescriptor.getElementSerializer();
            TypeSerializer newSerializerForRestoredState = newAccessDescriptorAfterRestore.getElementSerializer();
            this.internalCopySerializerTest(previousSerializer, newSerializerForRestoredState, internalListCopySerializer);
            Iterator iterator = ((Iterable)state.get()).iterator();
            Assertions.assertThat((Object)((Object)((TestType)((Object)iterator.next())))).isEqualTo((Object)new TestType("foo", 13));
            Assertions.assertThat((Object)((Object)((TestType)((Object)iterator.next())))).isEqualTo((Object)new TestType("bar", 278));
            Assertions.assertThat(iterator).isExhausted();
            state.add((Object)new TestType("new-entry", 777));
        }
        finally {
            backend.dispose();
        }
    }

    @TestTemplate
    void testOperatorUnionListStateMigration() throws Exception {
        String stateName = "union-list-state";
        this.testOperatorUnionListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.V2TestTypeSerializer()));
    }

    @TestTemplate
    void testOperatorUnionListStateSerializerReconfiguration() throws Exception {
        String stateName = "union-list-state";
        this.testOperatorUnionListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @TestTemplate
    void testOperatorUnionListStateRegistrationFailsIfNewSerializerIsIncompatible() {
        String stateName = "union-list-state";
        Assertions.assertThatThrownBy(() -> this.testOperatorUnionListStateUpgrade((ListStateDescriptor<TestType>)new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.V1TestTypeSerializer()), (ListStateDescriptor<TestType>)new ListStateDescriptor("union-list-state", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer()))).satisfiesAnyOf(new ThrowingConsumer[]{e -> Assertions.assertThat((Throwable)e).isInstanceOf(StateMigrationException.class), e -> Assertions.assertThat((Throwable)e).hasCauseInstanceOf(StateMigrationException.class)});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testOperatorUnionListStateUpgrade(ListStateDescriptor<TestType> initialAccessDescriptor, ListStateDescriptor<TestType> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        try {
            ListState state = backend.getUnionListState(initialAccessDescriptor);
            state.add((Object)new TestType("foo", 13));
            state.add((Object)new TestType("bar", 278));
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            state = backend.getUnionListState(newAccessDescriptorAfterRestore);
            TypeSerializer internalListCopySerializer = ((PartitionableListState)state).getInternalListCopySerializer().getElementSerializer();
            TypeSerializer previousSerializer = initialAccessDescriptor.getElementSerializer();
            TypeSerializer newSerializerForRestoredState = newAccessDescriptorAfterRestore.getElementSerializer();
            this.internalCopySerializerTest(previousSerializer, newSerializerForRestoredState, internalListCopySerializer);
            Iterator iterator = ((Iterable)state.get()).iterator();
            Assertions.assertThat((Object)((Object)((TestType)((Object)iterator.next())))).isEqualTo((Object)new TestType("foo", 13));
            Assertions.assertThat((Object)((Object)((TestType)((Object)iterator.next())))).isEqualTo((Object)new TestType("bar", 278));
            Assertions.assertThat(iterator).isExhausted();
            state.add((Object)new TestType("new-entry", 777));
        }
        finally {
            backend.dispose();
        }
    }

    @TestTemplate
    void testBroadcastStateValueMigration() throws Exception {
        String stateName = "broadcast-state";
        this.testBroadcastStateValueUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V2TestTypeSerializer()));
    }

    @TestTemplate
    void testBroadcastStateKeyMigration() throws Exception {
        String stateName = "broadcast-state";
        this.testBroadcastStateKeyUpgrade((MapStateDescriptor<TestType, Integer>)new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.V1TestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE), (MapStateDescriptor<TestType, Integer>)new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.V2TestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE));
    }

    @TestTemplate
    void testBroadcastStateValueSerializerReconfiguration() throws Exception {
        String stateName = "broadcast-state";
        this.testBroadcastStateValueUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer()));
    }

    @TestTemplate
    void testBroadcastStateKeySerializerReconfiguration() throws Exception {
        String stateName = "broadcast-state";
        this.testBroadcastStateKeyUpgrade((MapStateDescriptor<TestType, Integer>)new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.V1TestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE), (MapStateDescriptor<TestType, Integer>)new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.ReconfigurationRequiringTestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE));
    }

    @TestTemplate
    void testBroadcastStateRegistrationFailsIfNewValueSerializerIsIncompatible() {
        String stateName = "broadcast-state";
        Assertions.assertThatThrownBy(() -> this.testBroadcastStateValueUpgrade((MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.V1TestTypeSerializer()), (MapStateDescriptor<Integer, TestType>)new MapStateDescriptor("broadcast-state", (TypeSerializer)IntSerializer.INSTANCE, (TypeSerializer)new TestType.IncompatibleTestTypeSerializer()))).satisfiesAnyOf(new ThrowingConsumer[]{e -> Assertions.assertThat((Throwable)e).isInstanceOf(StateMigrationException.class), e -> Assertions.assertThat((Throwable)e).hasCauseInstanceOf(StateMigrationException.class)});
    }

    @TestTemplate
    void testBroadcastStateRegistrationFailsIfNewKeySerializerIsIncompatible() {
        String stateName = "broadcast-state";
        Assertions.assertThatThrownBy(() -> this.testBroadcastStateKeyUpgrade((MapStateDescriptor<TestType, Integer>)new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.V1TestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE), (MapStateDescriptor<TestType, Integer>)new MapStateDescriptor("broadcast-state", (TypeSerializer)new TestType.IncompatibleTestTypeSerializer(), (TypeSerializer)IntSerializer.INSTANCE))).satisfiesAnyOf(new ThrowingConsumer[]{e -> Assertions.assertThat((Throwable)e).isInstanceOf(StateMigrationException.class), e -> Assertions.assertThat((Throwable)e).hasCauseInstanceOf(StateMigrationException.class)});
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testBroadcastStateValueUpgrade(MapStateDescriptor<Integer, TestType> initialAccessDescriptor, MapStateDescriptor<Integer, TestType> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        try {
            BroadcastState state = backend.getBroadcastState(initialAccessDescriptor);
            state.put((Object)3, (Object)new TestType("foo", 13));
            state.put((Object)5, (Object)new TestType("bar", 278));
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            state = backend.getBroadcastState(newAccessDescriptorAfterRestore);
            MapSerializer internalMapCopySerializer = ((HeapBroadcastState)state).getInternalMapCopySerializer();
            MapSerializer previousSerializer = new MapSerializer(initialAccessDescriptor.getKeySerializer(), internalMapCopySerializer.getValueSerializer());
            MapSerializer newSerializerForRestoredState = new MapSerializer(newAccessDescriptorAfterRestore.getKeySerializer(), newAccessDescriptorAfterRestore.getValueSerializer());
            this.internalCopySerializerTest((TypeSerializer)previousSerializer, (TypeSerializer)newSerializerForRestoredState, (TypeSerializer)internalMapCopySerializer);
            Assertions.assertThat((Object)((Object)((TestType)((Object)state.get((Object)3))))).isEqualTo((Object)new TestType("foo", 13));
            Assertions.assertThat((Object)((Object)((TestType)((Object)state.get((Object)5))))).isEqualTo((Object)new TestType("bar", 278));
            state.put((Object)17, (Object)new TestType("new-entry", 777));
        }
        finally {
            backend.dispose();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testBroadcastStateKeyUpgrade(MapStateDescriptor<TestType, Integer> initialAccessDescriptor, MapStateDescriptor<TestType, Integer> newAccessDescriptorAfterRestore) throws Exception {
        CheckpointStreamFactory streamFactory = this.createStreamFactory();
        OperatorStateBackend backend = this.createOperatorStateBackend();
        try {
            BroadcastState state = backend.getBroadcastState(initialAccessDescriptor);
            state.put((Object)new TestType("foo", 13), (Object)3);
            state.put((Object)new TestType("bar", 278), (Object)5);
            OperatorStateHandle snapshot = this.runSnapshot(backend.snapshot(1L, 2L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation()));
            backend.dispose();
            backend = this.restoreOperatorStateBackend(snapshot);
            state = backend.getBroadcastState(newAccessDescriptorAfterRestore);
            Assertions.assertThat((Integer)((Integer)state.get((Object)new TestType("foo", 13)))).isEqualTo(3);
            Assertions.assertThat((Integer)((Integer)state.get((Object)new TestType("bar", 278)))).isEqualTo(5);
            state.put((Object)new TestType("new-entry", 777), (Object)17);
        }
        finally {
            backend.dispose();
        }
    }

    void internalCopySerializerTest(TypeSerializer previousSerializer, TypeSerializer newSerializerForRestoredState, TypeSerializer internalCopySerializer) {
        StateSerializerProvider testProvider = StateSerializerProvider.fromPreviousSerializerSnapshot((TypeSerializerSnapshot)previousSerializer.snapshotConfiguration());
        testProvider.registerNewSerializerForRestoredState(newSerializerForRestoredState);
        Assertions.assertThat(internalCopySerializer.getClass()).isEqualTo(testProvider.currentSchemaSerializer().getClass());
    }

    @TestTemplate
    void testStateMigrationAfterChangingTTL() throws Exception {
        String stateName = "test-ttl";
        ValueStateDescriptor initialAccessDescriptor = new ValueStateDescriptor("test-ttl", (TypeSerializer)new TestType.V1TestTypeSerializer());
        initialAccessDescriptor.enableTimeToLive(StateTtlConfig.newBuilder((Duration)Duration.ofDays(1L)).build());
        ValueStateDescriptor newAccessDescriptorAfterRestore = new ValueStateDescriptor("test-ttl", (TypeSerializer)new TestType.V2TestTypeSerializer());
        newAccessDescriptorAfterRestore.enableTimeToLive(StateTtlConfig.newBuilder((Duration)Duration.ofDays(2L)).build());
        this.testKeyedValueStateUpgrade((ValueStateDescriptor<TestType>)initialAccessDescriptor, (ValueStateDescriptor<TestType>)newAccessDescriptorAfterRestore);
    }

    @TestTemplate
    void testStateMigrationAfterChangingTTLFromEnablingToDisabling() {
        String stateName = "test-ttl";
        ValueStateDescriptor initialAccessDescriptor = new ValueStateDescriptor("test-ttl", (TypeSerializer)new TestType.V1TestTypeSerializer());
        initialAccessDescriptor.enableTimeToLive(StateTtlConfig.newBuilder((Duration)Duration.ofDays(1L)).build());
        ValueStateDescriptor newAccessDescriptorAfterRestore = new ValueStateDescriptor("test-ttl", (TypeSerializer)new TestType.V2TestTypeSerializer());
        Assertions.assertThatThrownBy(() -> this.testKeyedValueStateUpgrade((ValueStateDescriptor<TestType>)initialAccessDescriptor, (ValueStateDescriptor<TestType>)newAccessDescriptorAfterRestore)).satisfiesAnyOf(new ThrowingConsumer[]{e -> Assertions.assertThat((Throwable)e).isInstanceOf(IllegalStateException.class), e -> Assertions.assertThat((Throwable)e).hasCauseInstanceOf(IllegalStateException.class)});
    }

    @TestTemplate
    void testStateMigrationAfterChangingTTLFromDisablingToEnabling() {
        String stateName = "test-ttl";
        ValueStateDescriptor initialAccessDescriptor = new ValueStateDescriptor("test-ttl", (TypeSerializer)new TestType.V1TestTypeSerializer());
        ValueStateDescriptor newAccessDescriptorAfterRestore = new ValueStateDescriptor("test-ttl", (TypeSerializer)new TestType.V2TestTypeSerializer());
        newAccessDescriptorAfterRestore.enableTimeToLive(StateTtlConfig.newBuilder((Duration)Duration.ofDays(1L)).build());
        Assertions.assertThatThrownBy(() -> this.testKeyedValueStateUpgrade((ValueStateDescriptor<TestType>)initialAccessDescriptor, (ValueStateDescriptor<TestType>)newAccessDescriptorAfterRestore)).satisfiesAnyOf(new ThrowingConsumer[]{e -> Assertions.assertThat((Throwable)e).isInstanceOf(StateMigrationException.class), e -> Assertions.assertThat((Throwable)e).hasCauseInstanceOf(StateMigrationException.class)});
    }

    private CheckpointStreamFactory createStreamFactory() throws Exception {
        if (this.checkpointStorageLocation == null) {
            CheckpointStorageAccess checkpointStorageAccess = this.getCheckpointStorage().createCheckpointStorage(new JobID());
            checkpointStorageAccess.initializeBaseLocationsForCheckpoint();
            this.env.setCheckpointStorageAccess(checkpointStorageAccess);
            this.checkpointStorageLocation = checkpointStorageAccess.initializeLocationForCheckpoint(1L);
        }
        return this.checkpointStorageLocation;
    }

    private <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer) throws Exception {
        return this.createKeyedBackend(keySerializer, this.env);
    }

    private <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, Environment env) throws Exception {
        return this.createKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), env);
    }

    private <K> CheckpointableKeyedStateBackend<K> createKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, Environment env) throws Exception {
        B stateBackend = this.getStateBackend();
        JobID jobID = new JobID();
        TaskKvStateRegistry kvStateRegistry = env.getTaskKvStateRegistry();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        CheckpointableKeyedStateBackend backend = stateBackend.createKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl(env, jobID, "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry, TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), Collections.emptyList(), cancelStreamRegistry));
        return backend;
    }

    private <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state) throws Exception {
        return this.restoreKeyedBackend(keySerializer, state, this.env);
    }

    private <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, KeyedStateHandle state, Environment env) throws Exception {
        return this.restoreKeyedBackend(keySerializer, 10, new KeyGroupRange(0, 9), Collections.singletonList(state), env);
    }

    private <K> CheckpointableKeyedStateBackend<K> restoreKeyedBackend(TypeSerializer<K> keySerializer, int numberOfKeyGroups, KeyGroupRange keyGroupRange, List<KeyedStateHandle> state, Environment env) throws Exception {
        B stateBackend = this.getStateBackend();
        JobID jobID = new JobID();
        TaskKvStateRegistry kvStateRegistry = env.getTaskKvStateRegistry();
        CloseableRegistry cancelStreamRegistry = new CloseableRegistry();
        CheckpointableKeyedStateBackend backend = stateBackend.createKeyedStateBackend((StateBackend.KeyedStateBackendParameters)new KeyedStateBackendParametersImpl(env, jobID, "test_op", keySerializer, numberOfKeyGroups, keyGroupRange, kvStateRegistry, TtlTimeProvider.DEFAULT, (MetricGroup)new UnregisteredMetricsGroup(), state, cancelStreamRegistry));
        return backend;
    }

    private KeyedStateHandle runSnapshot(RunnableFuture<SnapshotResult<KeyedStateHandle>> snapshotRunnableFuture, SharedStateRegistry sharedStateRegistry) throws Exception {
        SnapshotResult snapshotResult;
        KeyedStateHandle jobManagerOwnedSnapshot;
        if (!snapshotRunnableFuture.isDone()) {
            snapshotRunnableFuture.run();
        }
        if ((jobManagerOwnedSnapshot = (KeyedStateHandle)(snapshotResult = (SnapshotResult)snapshotRunnableFuture.get()).getJobManagerOwnedSnapshot()) != null) {
            jobManagerOwnedSnapshot.registerSharedStates(sharedStateRegistry, 0L);
        }
        return jobManagerOwnedSnapshot;
    }

    private OperatorStateBackend createOperatorStateBackend() throws Exception {
        return this.getStateBackend().createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl((Environment)this.env, "test_op", Collections.emptyList(), new CloseableRegistry()));
    }

    private OperatorStateBackend createOperatorStateBackend(Collection<OperatorStateHandle> state) throws Exception {
        return this.getStateBackend().createOperatorStateBackend((StateBackend.OperatorStateBackendParameters)new OperatorStateBackendParametersImpl((Environment)this.env, "test_op", state, new CloseableRegistry()));
    }

    private OperatorStateBackend restoreOperatorStateBackend(OperatorStateHandle state) throws Exception {
        OperatorStateBackend operatorStateBackend = this.createOperatorStateBackend((Collection<OperatorStateHandle>)StateObjectCollection.singleton((StateObject)state));
        return operatorStateBackend;
    }

    private OperatorStateHandle runSnapshot(RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshotRunnableFuture) throws Exception {
        if (!snapshotRunnableFuture.isDone()) {
            snapshotRunnableFuture.run();
        }
        return (OperatorStateHandle)((SnapshotResult)snapshotRunnableFuture.get()).getJobManagerOwnedSnapshot();
    }

    public static class CustomVoidNamespaceSerializerSnapshot
    implements TypeSerializerSnapshot<VoidNamespace> {
        public TypeSerializer<VoidNamespace> restoreSerializer() {
            return new CustomVoidNamespaceSerializer();
        }

        public TypeSerializerSchemaCompatibility<VoidNamespace> resolveSchemaCompatibility(TypeSerializerSnapshot<VoidNamespace> oldSerializerSnapshot) {
            return TypeSerializerSchemaCompatibility.compatibleAsIs();
        }

        public void writeSnapshot(DataOutputView out) throws IOException {
        }

        public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
        }

        public boolean equals(Object obj) {
            return obj instanceof CustomVoidNamespaceSerializerSnapshot;
        }

        public int hashCode() {
            return 0;
        }

        public int getCurrentVersion() {
            return 0;
        }
    }

    public static class CustomVoidNamespaceSerializer
    extends TypeSerializer<VoidNamespace> {
        private static final long serialVersionUID = 1L;
        public static final CustomVoidNamespaceSerializer INSTANCE = new CustomVoidNamespaceSerializer();

        public boolean isImmutableType() {
            return true;
        }

        public VoidNamespace createInstance() {
            return VoidNamespace.get();
        }

        public VoidNamespace copy(VoidNamespace from) {
            return VoidNamespace.get();
        }

        public VoidNamespace copy(VoidNamespace from, VoidNamespace reuse) {
            return VoidNamespace.get();
        }

        public int getLength() {
            return 0;
        }

        public void serialize(VoidNamespace record, DataOutputView target) throws IOException {
            target.write(0);
        }

        public VoidNamespace deserialize(DataInputView source) throws IOException {
            source.readByte();
            return VoidNamespace.get();
        }

        public VoidNamespace deserialize(VoidNamespace reuse, DataInputView source) throws IOException {
            source.readByte();
            return VoidNamespace.get();
        }

        public void copy(DataInputView source, DataOutputView target) throws IOException {
            target.write((int)source.readByte());
        }

        public TypeSerializer<VoidNamespace> duplicate() {
            return this;
        }

        public boolean equals(Object obj) {
            return obj instanceof CustomVoidNamespaceSerializer;
        }

        public int hashCode() {
            return ((Object)((Object)this)).getClass().hashCode();
        }

        public TypeSerializerSnapshot<VoidNamespace> snapshotConfiguration() {
            return new CustomVoidNamespaceSerializerSnapshot();
        }
    }
}

