/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.SingleThreadAccessCheckingTypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.internal.InternalValueState;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.util.StringUtils;

class StateSnapshotTransformerTest {
    private final CheckpointableKeyedStateBackend<Integer> backend;
    private final BlockerCheckpointStreamFactory streamFactory;
    private final StateSnapshotTransformer.StateSnapshotTransformFactory<?> snapshotTransformFactory;

    StateSnapshotTransformerTest(CheckpointableKeyedStateBackend<Integer> backend, BlockerCheckpointStreamFactory streamFactory) {
        this.backend = backend;
        this.streamFactory = streamFactory;
        this.snapshotTransformFactory = SingleThreadAccessCheckingSnapshotTransformFactory.create();
    }

    void testNonConcurrentSnapshotTransformerAccess() throws Exception {
        List<TestState> testStates = Arrays.asList(new TestValueState(), new TestListState(), new TestMapState());
        for (TestState state : testStates) {
            for (int i = 0; i < 100; ++i) {
                this.backend.setCurrentKey((Object)i);
                state.setToRandomValue();
            }
            CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation();
            RunnableFuture snapshot1 = this.backend.snapshot(1L, 0L, (CheckpointStreamFactory)this.streamFactory, checkpointOptions);
            RunnableFuture snapshot2 = this.backend.snapshot(2L, 0L, (CheckpointStreamFactory)this.streamFactory, checkpointOptions);
            Thread runner1 = new Thread((Runnable)snapshot1, "snapshot1");
            runner1.start();
            Thread runner2 = new Thread((Runnable)snapshot2, "snapshot2");
            runner2.start();
            runner1.join();
            runner2.join();
            snapshot1.get();
            snapshot2.get();
        }
    }

    private static class SingleThreadAccessChecker
    implements Serializable {
        private static final long serialVersionUID = 131020282727167064L;
        private final AtomicReference<Thread> currentThreadRef = new AtomicReference();

        private SingleThreadAccessChecker() {
        }

        void checkSingleThreadAccess() {
            this.currentThreadRef.compareAndSet(null, Thread.currentThread());
            assert (Thread.currentThread().equals(this.currentThreadRef.get())) : "Concurrent access from another thread";
        }
    }

    private static class SingleThreadAccessCheckingSnapshotTransformFactory<T>
    implements StateSnapshotTransformer.StateSnapshotTransformFactory<T> {
        private final SingleThreadAccessChecker singleThreadAccessChecker = new SingleThreadAccessChecker();

        private SingleThreadAccessCheckingSnapshotTransformFactory() {
        }

        static <T> StateSnapshotTransformer.StateSnapshotTransformFactory<T> create() {
            return new SingleThreadAccessCheckingSnapshotTransformFactory<T>();
        }

        public Optional<StateSnapshotTransformer<T>> createForDeserializedState() {
            this.singleThreadAccessChecker.checkSingleThreadAccess();
            return this.createStateSnapshotTransformer();
        }

        public Optional<StateSnapshotTransformer<byte[]>> createForSerializedState() {
            this.singleThreadAccessChecker.checkSingleThreadAccess();
            return this.createStateSnapshotTransformer();
        }

        private <T1> Optional<StateSnapshotTransformer<T1>> createStateSnapshotTransformer() {
            return Optional.of(new StateSnapshotTransformer<T1>(){
                private final SingleThreadAccessChecker singleThreadAccessChecker = new SingleThreadAccessChecker();

                @Nullable
                public T1 filterOrTransform(@Nullable T1 value) {
                    this.singleThreadAccessChecker.checkSingleThreadAccess();
                    return value;
                }
            });
        }
    }

    private class TestMapState
    extends TestState {
        private final InternalMapState<Integer, VoidNamespace, String, String> state;

        private TestMapState() throws Exception {
            this.state = (InternalMapState)StateSnapshotTransformerTest.this.backend.createOrUpdateInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new MapStateDescriptor("TestMapState", (TypeSerializer)StringSerializer.INSTANCE, (TypeSerializer)StringSerializer.INSTANCE), StateSnapshotTransformerTest.this.snapshotTransformFactory);
            this.state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        }

        @Override
        void setToRandomValue() throws Exception {
            int length = this.rnd.nextInt(10);
            for (int i = 0; i < length; ++i) {
                this.state.put((Object)this.getRandomString(), (Object)this.getRandomString());
            }
        }
    }

    private class TestListState
    extends TestState {
        private final InternalListState<Integer, VoidNamespace, String> state;

        private TestListState() throws Exception {
            this.state = (InternalListState)StateSnapshotTransformerTest.this.backend.createOrUpdateInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ListStateDescriptor("TestListState", (TypeSerializer)new SingleThreadAccessCheckingTypeSerializer((TypeSerializer)StringSerializer.INSTANCE)), StateSnapshotTransformerTest.this.snapshotTransformFactory);
            this.state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        }

        @Override
        void setToRandomValue() throws Exception {
            int length = this.rnd.nextInt(10);
            for (int i = 0; i < length; ++i) {
                this.state.add((Object)this.getRandomString());
            }
        }
    }

    private class TestValueState
    extends TestState {
        private final InternalValueState<Integer, VoidNamespace, String> state;

        private TestValueState() throws Exception {
            this.state = (InternalValueState)StateSnapshotTransformerTest.this.backend.createOrUpdateInternalState((TypeSerializer)VoidNamespaceSerializer.INSTANCE, (StateDescriptor)new ValueStateDescriptor("TestValueState", (TypeSerializer)StringSerializer.INSTANCE), StateSnapshotTransformerTest.this.snapshotTransformFactory);
            this.state.setCurrentNamespace((Object)VoidNamespace.INSTANCE);
        }

        @Override
        void setToRandomValue() throws Exception {
            this.state.update((Object)this.getRandomString());
        }
    }

    private abstract class TestState {
        final Random rnd = new Random();

        private TestState() {
        }

        abstract void setToRandomValue() throws Exception;

        String getRandomString() {
            return StringUtils.getRandomString((Random)this.rnd, (int)5, (int)10);
        }
    }
}

