/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.zookeeper;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.zookeeper.StateStorageHelper;
import org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore;
import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.framework.api.BackgroundCallback;
import org.apache.flink.shaded.org.apache.curator.framework.api.CuratorEvent;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class ZooKeeperStateHandleStoreITCase
extends TestLogger {
    private static final ZooKeeperTestEnvironment ZooKeeper = new ZooKeeperTestEnvironment(1);

    @AfterClass
    public static void tearDown() throws Exception {
        if (ZooKeeper != null) {
            ZooKeeper.shutdown();
        }
    }

    @Before
    public void cleanUp() throws Exception {
        ZooKeeper.deleteAll();
    }

    @Test
    public void testAdd() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), (StateStorageHelper)longStateStorage);
        String pathInZooKeeper = "/testAdd";
        Long state = 1239712317L;
        store.add("/testAdd", (Serializable)state);
        Assert.assertEquals((long)1L, (long)store.getAll().size());
        Assert.assertEquals((Object)state, (Object)store.get("/testAdd").getState(null));
        Stat stat = (Stat)ZooKeeper.getClient().checkExists().forPath("/testAdd");
        Assert.assertNotNull((Object)stat);
        Assert.assertEquals((long)0L, (long)stat.getEphemeralOwner());
        Long actual = (Long)((StateHandle)InstantiationUtil.deserializeObject((byte[])((byte[])ZooKeeper.getClient().getData().forPath("/testAdd")), (ClassLoader)ClassLoader.getSystemClassLoader())).getState(null);
        Assert.assertEquals((Object)state, (Object)actual);
    }

    @Test
    public void testAddWithCreateMode() throws Exception {
        LongStateStorage longStateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), (StateStorageHelper)longStateStorage);
        Long state = 3457347234L;
        CreateMode[] modes = CreateMode.values();
        for (int i = 0; i < modes.length; ++i) {
            CreateMode mode = modes[i];
            state = state + (long)i;
            String pathInZooKeeper = "/testAddWithCreateMode" + mode.name();
            store.add(pathInZooKeeper, (Serializable)state, mode);
            if (mode.isSequential()) {
                List paths = (List)ZooKeeper.getClient().getChildren().forPath("/");
                for (String p : paths) {
                    if (!p.startsWith("testAddWithCreateMode" + mode.name())) continue;
                    pathInZooKeeper = "/" + p;
                    break;
                }
            }
            Assert.assertEquals((long)(i + 1), (long)store.getAll().size());
            Assert.assertEquals((Object)state, (Object)longStateStorage.getStateHandles().get(i).getState(null));
            Stat stat = (Stat)ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper);
            Assert.assertNotNull((Object)stat);
            if (mode.isEphemeral()) {
                Assert.assertTrue((stat.getEphemeralOwner() != 0L ? 1 : 0) != 0);
            } else {
                Assert.assertEquals((long)0L, (long)stat.getEphemeralOwner());
            }
            Long actual = (Long)((StateHandle)InstantiationUtil.deserializeObject((byte[])((byte[])ZooKeeper.getClient().getData().forPath(pathInZooKeeper)), (ClassLoader)ClassLoader.getSystemClassLoader())).getState(null);
            Assert.assertEquals((Object)state, (Object)actual);
        }
    }

    @Test(expected=Exception.class)
    public void testAddAlreadyExistingPath() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), (StateStorageHelper)stateHandleProvider);
        ZooKeeper.getClient().create().forPath("/testAddAlreadyExistingPath");
        store.add("/testAddAlreadyExistingPath", (Serializable)Long.valueOf(1L));
    }

    @Test
    public void testAddDiscardStateHandleAfterFailure() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        CuratorFramework client = (CuratorFramework)Mockito.spy((Object)ZooKeeper.getClient());
        Mockito.when((Object)client.create()).thenThrow(new Throwable[]{new RuntimeException("Expected test Exception.")});
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(client, (StateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testAddDiscardStateHandleAfterFailure";
        Long state = 81282227L;
        try {
            store.add("/testAddDiscardStateHandleAfterFailure", (Serializable)state);
            Assert.fail((String)"Did not throw expected exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertEquals((long)1L, (long)stateHandleProvider.getStateHandles().size());
        Assert.assertEquals((Object)state, (Object)stateHandleProvider.getStateHandles().get(0).getState(null));
        Assert.assertEquals((long)1L, (long)stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls());
    }

    @Test
    public void testReplace() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), (StateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testReplace";
        Long initialState = 30968470898L;
        Long replaceState = 88383776661L;
        store.add("/testReplace", (Serializable)initialState);
        store.replace("/testReplace", 0, (Serializable)replaceState);
        Assert.assertEquals((long)2L, (long)stateHandleProvider.getStateHandles().size());
        Assert.assertEquals((Object)initialState, (Object)stateHandleProvider.getStateHandles().get(0).getState(null));
        Assert.assertEquals((Object)replaceState, (Object)stateHandleProvider.getStateHandles().get(1).getState(null));
        Stat stat = (Stat)ZooKeeper.getClient().checkExists().forPath("/testReplace");
        Assert.assertNotNull((Object)stat);
        Assert.assertEquals((long)0L, (long)stat.getEphemeralOwner());
        Long actual = (Long)((StateHandle)InstantiationUtil.deserializeObject((byte[])((byte[])ZooKeeper.getClient().getData().forPath("/testReplace")), (ClassLoader)ClassLoader.getSystemClassLoader())).getState(null);
        Assert.assertEquals((Object)replaceState, (Object)actual);
    }

    @Test(expected=Exception.class)
    public void testReplaceNonExistingPath() throws Exception {
        LongStateStorage stateStorage = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), (StateStorageHelper)stateStorage);
        store.replace("/testReplaceNonExistingPath", 0, (Serializable)Long.valueOf(1L));
    }

    @Test
    public void testReplaceDiscardStateHandleAfterFailure() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        CuratorFramework client = (CuratorFramework)Mockito.spy((Object)ZooKeeper.getClient());
        Mockito.when((Object)client.setData()).thenThrow(new Throwable[]{new RuntimeException("Expected test Exception.")});
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(client, (StateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testReplaceDiscardStateHandleAfterFailure";
        Long initialState = 30968470898L;
        Long replaceState = 88383776661L;
        store.add("/testReplaceDiscardStateHandleAfterFailure", (Serializable)initialState);
        try {
            store.replace("/testReplaceDiscardStateHandleAfterFailure", 0, (Serializable)replaceState);
            Assert.fail((String)"Did not throw expected exception");
        }
        catch (Exception exception) {
            // empty catch block
        }
        Assert.assertEquals((long)2L, (long)stateHandleProvider.getStateHandles().size());
        Assert.assertEquals((Object)initialState, (Object)stateHandleProvider.getStateHandles().get(0).getState(null));
        Assert.assertEquals((Object)replaceState, (Object)stateHandleProvider.getStateHandles().get(1).getState(null));
        Assert.assertEquals((long)1L, (long)stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls());
        Long actual = (Long)((StateHandle)InstantiationUtil.deserializeObject((byte[])((byte[])ZooKeeper.getClient().getData().forPath("/testReplaceDiscardStateHandleAfterFailure")), (ClassLoader)ClassLoader.getSystemClassLoader())).getState(null);
        Assert.assertEquals((Object)initialState, (Object)actual);
    }

    @Test
    public void testGetAndExists() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), (StateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testGetAndExists";
        Long state = 311222268470898L;
        Assert.assertEquals((long)-1L, (long)store.exists("/testGetAndExists"));
        store.add("/testGetAndExists", (Serializable)state);
        StateHandle actual = store.get("/testGetAndExists");
        Assert.assertEquals((Object)state, (Object)actual.getState(null));
        Assert.assertTrue((store.exists("/testGetAndExists") >= 0 ? 1 : 0) != 0);
    }

    @Test(expected=Exception.class)
    public void testGetNonExistingPath() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), (StateStorageHelper)stateHandleProvider);
        store.get("/testGetNonExistingPath");
    }

    @Test
    public void testGetAll() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), (StateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testGetAll";
        HashSet<Long> expected = new HashSet<Long>();
        expected.add(311222268470898L);
        expected.add(132812888L);
        expected.add(27255442L);
        expected.add(11122233124L);
        Iterator iterator = expected.iterator();
        while (iterator.hasNext()) {
            long val = (Long)iterator.next();
            store.add("/testGetAll", (Serializable)Long.valueOf(val), CreateMode.PERSISTENT_SEQUENTIAL);
        }
        for (Tuple2 val : store.getAll()) {
            Assert.assertTrue((boolean)expected.remove(((StateHandle)val.f0).getState(null)));
        }
        Assert.assertEquals((long)0L, (long)expected.size());
    }

    @Test
    public void testGetAllSortedByName() throws Exception {
        Long[] expected;
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), (StateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testGetAllSortedByName";
        Long[] longArray = expected = new Long[]{311222268470898L, 132812888L, 27255442L, 11122233124L};
        int n = longArray.length;
        for (int i = 0; i < n; ++i) {
            long val = longArray[i];
            store.add("/testGetAllSortedByName", (Serializable)Long.valueOf(val), CreateMode.PERSISTENT_SEQUENTIAL);
        }
        List actual = store.getAllSortedByName();
        Assert.assertEquals((long)expected.length, (long)actual.size());
        for (int i = 0; i < expected.length; ++i) {
            Assert.assertEquals((Object)expected[i], (Object)((StateHandle)((Tuple2)actual.get((int)i)).f0).getState(null));
        }
    }

    @Test
    public void testRemove() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), (StateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testRemove";
        Long state = 27255442L;
        store.add("/testRemove", (Serializable)state);
        store.remove("/testRemove");
        Assert.assertEquals((long)0L, (long)((List)ZooKeeper.getClient().getChildren().forPath("/")).size());
    }

    @Test
    public void testRemoveWithCallback() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), (StateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testRemoveWithCallback";
        Long state = 27255442L;
        store.add("/testRemoveWithCallback", (Serializable)state);
        final CountDownLatch sync = new CountDownLatch(1);
        BackgroundCallback callback = (BackgroundCallback)Mockito.mock(BackgroundCallback.class);
        ((BackgroundCallback)Mockito.doAnswer((Answer)new Answer<Void>(){

            public Void answer(InvocationOnMock invocation) throws Throwable {
                sync.countDown();
                return null;
            }
        }).when((Object)callback)).processResult((CuratorFramework)Matchers.eq((Object)ZooKeeper.getClient()), (CuratorEvent)Matchers.any(CuratorEvent.class));
        store.remove("/testRemoveWithCallback", callback);
        Assert.assertEquals((long)0L, (long)((List)ZooKeeper.getClient().getChildren().forPath("/")).size());
        sync.await();
        ((BackgroundCallback)Mockito.verify((Object)callback, (VerificationMode)Mockito.times((int)1))).processResult((CuratorFramework)Matchers.eq((Object)ZooKeeper.getClient()), (CuratorEvent)Matchers.any(CuratorEvent.class));
    }

    @Test
    public void testRemoveAndDiscardState() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), (StateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testDiscard";
        Long state = 27255442L;
        store.add("/testDiscard", (Serializable)state);
        store.removeAndDiscardState("/testDiscard");
        Assert.assertEquals((long)0L, (long)((List)ZooKeeper.getClient().getChildren().forPath("/")).size());
    }

    @Test
    public void testRemoveAndDiscardAllState() throws Exception {
        LongStateStorage stateHandleProvider = new LongStateStorage();
        ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore(ZooKeeper.getClient(), (StateStorageHelper)stateHandleProvider);
        String pathInZooKeeper = "/testDiscardAll";
        HashSet<Long> expected = new HashSet<Long>();
        expected.add(311222268470898L);
        expected.add(132812888L);
        expected.add(27255442L);
        expected.add(11122233124L);
        Iterator iterator = expected.iterator();
        while (iterator.hasNext()) {
            long val = (Long)iterator.next();
            store.add("/testDiscardAll", (Serializable)Long.valueOf(val), CreateMode.PERSISTENT_SEQUENTIAL);
        }
        store.removeAndDiscardAllState();
        Assert.assertEquals((long)0L, (long)((List)ZooKeeper.getClient().getChildren().forPath("/")).size());
    }

    private static class LongStateHandle
    implements StateHandle<Long> {
        private static final long serialVersionUID = -3555329254423838912L;
        private final Long state;
        private int numberOfDiscardCalls;

        public LongStateHandle(Long state) {
            this.state = state;
        }

        public Long getState(ClassLoader ignored) throws Exception {
            return this.state;
        }

        public void discardState() throws Exception {
            ++this.numberOfDiscardCalls;
        }

        public int getNumberOfDiscardCalls() {
            return this.numberOfDiscardCalls;
        }
    }

    private static class LongStateStorage
    implements StateStorageHelper<Long> {
        private final List<LongStateHandle> stateHandles = new ArrayList<LongStateHandle>();

        private LongStateStorage() {
        }

        public StateHandle<Long> store(Long state) throws Exception {
            LongStateHandle stateHandle = new LongStateHandle(state);
            this.stateHandles.add(stateHandle);
            return stateHandle;
        }

        List<LongStateHandle> getStateHandles() {
            return this.stateHandles;
        }
    }
}

