/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hudi.client.transaction;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.curator.test.TestingServer;
import org.apache.hudi.client.transaction.lock.BaseZookeeperBasedLockProvider;
import org.apache.hudi.client.transaction.lock.ZookeeperBasedImplicitBasePathLockProvider;
import org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.config.LockConfiguration;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.exception.HoodieLockException;
import org.apache.hudi.storage.StorageConfiguration;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestZookeeperBasedLockProvider {
    private static final Logger LOG = LoggerFactory.getLogger(TestZookeeperBasedLockProvider.class);
    private static TestingServer server;
    private static CuratorFramework client;
    private static String basePath;
    private static String key;
    private static LockConfiguration zkConfWithZkBasePathAndLockKeyLock;
    private static LockConfiguration zkConfNoTableBasePathTableName;
    private static LockConfiguration zkConfWithTableBasePathTableName;
    private static LockConfiguration zkConfWithZkBasePathLockKeyTableInfo;

    @BeforeAll
    public static void setup() {
        while (server == null) {
            try {
                server = new TestingServer();
                CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
                client = builder.connectString(server.getConnectString()).retryPolicy((RetryPolicy)new RetryOneTime(1000)).build();
            }
            catch (Exception e) {
                LOG.error("Getting bind exception - retrying to allocate server");
                server = null;
            }
        }
        Properties properties = new Properties();
        properties.setProperty("hoodie.write.lock.zookeeper.url", server.getConnectString());
        properties.setProperty("hoodie.write.lock.wait_time_ms_between_retry", "1000");
        properties.setProperty("hoodie.write.lock.max_wait_time_ms_between_retry", "3000");
        properties.setProperty("hoodie.write.lock.client.num_retries", "3");
        properties.setProperty("hoodie.write.lock.num_retries", "3");
        properties.setProperty("hoodie.write.lock.zookeeper.session_timeout_ms", "10000");
        properties.setProperty("hoodie.write.lock.zookeeper.connection_timeout_ms", "10000");
        properties.setProperty("hoodie.write.lock.wait_time_ms", "1000");
        zkConfNoTableBasePathTableName = new LockConfiguration(properties);
        Properties propsWithTableInfo = (Properties)properties.clone();
        propsWithTableInfo.setProperty(HoodieCommonConfig.BASE_PATH.key(), "s3://my-bucket-8b2a4b30/1718662238400/be715573/my_lake/my_table");
        propsWithTableInfo.setProperty("hoodie.table.name", "ma_po_tofu_is_awesome");
        zkConfWithTableBasePathTableName = new LockConfiguration(propsWithTableInfo);
        properties.setProperty("hoodie.write.lock.zookeeper.base_path", basePath);
        properties.setProperty("hoodie.write.lock.zookeeper.lock_key", key);
        zkConfWithZkBasePathAndLockKeyLock = new LockConfiguration(properties);
        properties.setProperty(HoodieCommonConfig.BASE_PATH.key(), "s3://my-bucket-8b2a4b30/1718662238400/be715573/my_lake/my_table");
        properties.setProperty("hoodie.table.name", "ma_po_tofu_is_awesome");
        zkConfWithZkBasePathLockKeyTableInfo = new LockConfiguration(properties);
    }

    @AfterAll
    public static void tearDown() throws IOException {
        if (server != null) {
            server.close();
        }
        if (client != null) {
            client.close();
        }
    }

    public static Stream<Object> testDimensions() {
        return Stream.of(Arguments.of((Object[])new Object[]{zkConfWithTableBasePathTableName, ZookeeperBasedImplicitBasePathLockProvider.class}), Arguments.of((Object[])new Object[]{zkConfWithZkBasePathAndLockKeyLock, ZookeeperBasedLockProvider.class}), Arguments.of((Object[])new Object[]{zkConfWithZkBasePathLockKeyTableInfo, ZookeeperBasedImplicitBasePathLockProvider.class}));
    }

    @ParameterizedTest
    @MethodSource(value={"testDimensions"})
    void testAcquireLock(LockConfiguration lockConfig, Class<?> lockProviderClass) {
        BaseZookeeperBasedLockProvider zookeeperLP = (BaseZookeeperBasedLockProvider)ReflectionUtils.loadClass((String)lockProviderClass.getName(), (Class[])new Class[]{LockConfiguration.class, StorageConfiguration.class}, (Object[])new Object[]{lockConfig, null});
        Assertions.assertTrue((boolean)zookeeperLP.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig().getLong("hoodie.write.lock.wait_time_ms"), TimeUnit.MILLISECONDS));
        zookeeperLP.unlock();
    }

    public static Stream<Object> testBadDimensions() {
        return Stream.of(Arguments.of((Object[])new Object[]{zkConfNoTableBasePathTableName, ZookeeperBasedImplicitBasePathLockProvider.class}), Arguments.of((Object[])new Object[]{zkConfWithTableBasePathTableName, ZookeeperBasedLockProvider.class}));
    }

    @ParameterizedTest
    @MethodSource(value={"testBadDimensions"})
    void testBadLockConfig(LockConfiguration lockConfig, Class<?> lockProviderClass) {
        Exception ex = null;
        try {
            ReflectionUtils.loadClass((String)lockProviderClass.getName(), (Class[])new Class[]{LockConfiguration.class, StorageConfiguration.class}, (Object[])new Object[]{lockConfig, null});
        }
        catch (Exception e) {
            ex = e;
        }
        Assertions.assertEquals(IllegalArgumentException.class, ex.getCause().getCause().getClass());
    }

    @Test
    public void testUnLock() {
        ZookeeperBasedLockProvider zookeeperBasedLockProvider = new ZookeeperBasedLockProvider(zkConfWithZkBasePathAndLockKeyLock, null);
        Assertions.assertTrue((boolean)zookeeperBasedLockProvider.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig().getLong("hoodie.write.lock.wait_time_ms"), TimeUnit.MILLISECONDS));
        zookeeperBasedLockProvider.unlock();
        zookeeperBasedLockProvider.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig().getLong("hoodie.write.lock.wait_time_ms"), TimeUnit.MILLISECONDS);
    }

    @Test
    public void testReentrantLock() {
        ZookeeperBasedLockProvider zookeeperBasedLockProvider = new ZookeeperBasedLockProvider(zkConfWithZkBasePathAndLockKeyLock, null);
        Assertions.assertTrue((boolean)zookeeperBasedLockProvider.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig().getLong("hoodie.write.lock.wait_time_ms"), TimeUnit.MILLISECONDS));
        try {
            zookeeperBasedLockProvider.tryLock(zkConfWithZkBasePathAndLockKeyLock.getConfig().getLong("hoodie.write.lock.wait_time_ms"), TimeUnit.MILLISECONDS);
            Assertions.fail();
        }
        catch (HoodieLockException hoodieLockException) {
            // empty catch block
        }
        zookeeperBasedLockProvider.unlock();
    }

    @Test
    public void testUnlockWithoutLock() {
        ZookeeperBasedLockProvider zookeeperBasedLockProvider = new ZookeeperBasedLockProvider(zkConfWithZkBasePathAndLockKeyLock, null);
        zookeeperBasedLockProvider.unlock();
    }

    static {
        basePath = "/hudi/test/lock";
        key = "table1";
    }
}

