/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.plugin.PluginManager;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager;
import org.apache.flink.runtime.state.changelog.ChangelogStateHandle;
import org.apache.flink.runtime.state.changelog.StateChangelogHandleReader;
import org.apache.flink.runtime.state.changelog.StateChangelogStorage;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageFactory;
import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Test;

public class TaskExecutorStateChangelogStoragesManagerTest {
    @Test
    public void testDuplicatedAllocation() throws IOException {
        TaskExecutorStateChangelogStoragesManager manager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        JobID jobId1 = new JobID(1L, 1L);
        StateChangelogStorage storage1 = manager.stateChangelogStorageForJob(jobId1, configuration);
        StateChangelogStorage storage2 = manager.stateChangelogStorageForJob(jobId1, configuration);
        Assert.assertEquals((Object)storage1, (Object)storage2);
        JobID jobId2 = new JobID(1L, 2L);
        StateChangelogStorage storage3 = manager.stateChangelogStorageForJob(jobId2, configuration);
        Assert.assertNotEquals((Object)storage1, (Object)storage3);
        manager.shutdown();
    }

    @Test
    public void testReleaseForJob() throws IOException {
        StateChangelogStorageLoader.initialize((PluginManager)TestStateChangelogStorageFactory.pluginManager);
        TaskExecutorStateChangelogStoragesManager manager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.STATE_CHANGE_LOG_STORAGE, (Object)TestStateChangelogStorageFactory.identifier);
        JobID jobId1 = new JobID(1L, 1L);
        StateChangelogStorage storage1 = manager.stateChangelogStorageForJob(jobId1, configuration);
        Assert.assertTrue((boolean)(storage1 instanceof TestStateChangelogStorage));
        Assert.assertFalse((boolean)((TestStateChangelogStorage)storage1).closed);
        manager.releaseStateChangelogStorageForJob(jobId1);
        Assert.assertTrue((boolean)((TestStateChangelogStorage)storage1).closed);
        StateChangelogStorage storage2 = manager.stateChangelogStorageForJob(jobId1, configuration);
        Assert.assertNotEquals((Object)storage1, (Object)storage2);
        manager.shutdown();
        StateChangelogStorageLoader.initialize(null);
    }

    @Test
    public void testConsistencyAmongTask() throws IOException {
        TaskExecutorStateChangelogStoragesManager manager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.STATE_CHANGE_LOG_STORAGE, (Object)"invalid");
        JobID jobId1 = new JobID(1L, 1L);
        StateChangelogStorage storage1 = manager.stateChangelogStorageForJob(jobId1, configuration);
        Assert.assertNull((Object)storage1);
        configuration.set(CheckpointingOptions.STATE_CHANGE_LOG_STORAGE, CheckpointingOptions.STATE_CHANGE_LOG_STORAGE.defaultValue());
        StateChangelogStorage storage2 = manager.stateChangelogStorageForJob(jobId1, configuration);
        Assert.assertNull((Object)storage2);
        JobID jobId2 = new JobID(1L, 2L);
        StateChangelogStorage storage3 = manager.stateChangelogStorageForJob(jobId2, configuration);
        Assert.assertNotNull((Object)storage3);
        configuration.set(CheckpointingOptions.STATE_CHANGE_LOG_STORAGE, (Object)"invalid");
        StateChangelogStorage storage4 = manager.stateChangelogStorageForJob(jobId2, configuration);
        Assert.assertNotNull((Object)storage4);
        Assert.assertEquals((Object)storage3, (Object)storage4);
        manager.shutdown();
    }

    @Test
    public void testShutdown() throws IOException {
        StateChangelogStorageLoader.initialize((PluginManager)TestStateChangelogStorageFactory.pluginManager);
        TaskExecutorStateChangelogStoragesManager manager = new TaskExecutorStateChangelogStoragesManager();
        Configuration configuration = new Configuration();
        configuration.set(CheckpointingOptions.STATE_CHANGE_LOG_STORAGE, (Object)TestStateChangelogStorageFactory.identifier);
        JobID jobId1 = new JobID(1L, 1L);
        StateChangelogStorage storage1 = manager.stateChangelogStorageForJob(jobId1, configuration);
        Assert.assertTrue((boolean)(storage1 instanceof TestStateChangelogStorage));
        Assert.assertFalse((boolean)((TestStateChangelogStorage)storage1).closed);
        JobID jobId2 = new JobID(1L, 2L);
        StateChangelogStorage storage2 = manager.stateChangelogStorageForJob(jobId1, configuration);
        Assert.assertTrue((boolean)(storage2 instanceof TestStateChangelogStorage));
        Assert.assertFalse((boolean)((TestStateChangelogStorage)storage2).closed);
        manager.shutdown();
        Assert.assertTrue((boolean)((TestStateChangelogStorage)storage1).closed);
        Assert.assertTrue((boolean)((TestStateChangelogStorage)storage2).closed);
        StateChangelogStorageLoader.initialize(null);
    }

    private static class TestStateChangelogStorageFactory
    implements StateChangelogStorageFactory {
        public static String identifier = "test-factory";
        public static PluginManager pluginManager = new PluginManager(){

            public <P> Iterator<P> load(Class<P> service) {
                Preconditions.checkArgument((boolean)service.equals(StateChangelogStorageFactory.class));
                return Collections.singletonList(new TestStateChangelogStorageFactory()).iterator();
            }
        };

        private TestStateChangelogStorageFactory() {
        }

        public String getIdentifier() {
            return identifier;
        }

        public StateChangelogStorage<?> createStorage(Configuration configuration) {
            return new TestStateChangelogStorage();
        }
    }

    private static class TestStateChangelogStorage
    implements StateChangelogStorage<ChangelogStateHandle> {
        public boolean closed = false;

        private TestStateChangelogStorage() {
        }

        public StateChangelogWriter<ChangelogStateHandle> createWriter(String operatorID, KeyGroupRange keyGroupRange) {
            return null;
        }

        public StateChangelogHandleReader<ChangelogStateHandle> createReader() {
            return null;
        }

        public void close() {
            this.closed = true;
        }
    }
}

