/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.polardbx;

import com.github.dockerjava.api.model.ExposedPort;
import com.github.dockerjava.api.model.PortBinding;
import com.github.dockerjava.api.model.Ports;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.AbstractTestBase;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

public abstract class PolardbxSourceTestBase
extends AbstractTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(PolardbxSourceTestBase.class);
    private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");
    protected static final Integer PORT = 8527;
    protected static final String HOST_NAME = "127.0.0.1";
    protected static final String USER_NAME = "polardbx_root";
    protected static final String PASSWORD = "123456";
    private static final String IMAGE_VERSION = "2.1.0";
    private static final DockerImageName POLARDBX_IMAGE = DockerImageName.parse((String)"polardbx/polardb-x:2.1.0");
    protected static final GenericContainer POLARDBX_CONTAINER = new GenericContainer(POLARDBX_IMAGE).withExposedPorts(new Integer[]{PORT}).withLogConsumer((Consumer)new Slf4jLogConsumer(LOG)).withStartupTimeout(Duration.ofMinutes(3L)).withCreateContainerCmdModifier(c -> c.withPortBindings(new PortBinding[]{new PortBinding(Ports.Binding.bindPort((int)PORT), new ExposedPort(PORT.intValue()))}));

    @BeforeClass
    public static void startContainers() throws InterruptedException {
        if (!PolardbxSourceTestBase.checkConnection().booleanValue()) {
            LOG.info("Polardbx connection is not valid, so try to start containers...");
            Startables.deepStart(Stream.of(POLARDBX_CONTAINER)).join();
            LOG.info("Containers are started.");
            Thread.sleep(10000L);
        }
    }

    @AfterClass
    public static void stopContainers() {
        LOG.info("Stopping Polardbx containers...");
        POLARDBX_CONTAINER.stop();
        LOG.info("Polardbx containers are stopped.");
    }

    protected static String getJdbcUrl() {
        return String.format("jdbc:mysql://%s:%s", HOST_NAME, PORT);
    }

    protected static Connection getJdbcConnection() throws SQLException {
        String jdbcUrl = PolardbxSourceTestBase.getJdbcUrl();
        LOG.info("jdbcUrl is :" + jdbcUrl);
        return DriverManager.getConnection(jdbcUrl, USER_NAME, PASSWORD);
    }

    protected static Boolean checkConnection() {
        LOG.info("check polardbx connection validation...");
        try {
            Connection connection = PolardbxSourceTestBase.getJdbcConnection();
            return connection.isValid(3);
        }
        catch (SQLException e) {
            LOG.warn("polardbx connection is not valid... caused by:" + e.getMessage());
            return false;
        }
    }

    protected static void initializePolardbxTables(String databaseName, Function<String, Boolean> filter) throws InterruptedException {
        String ddlFile = String.format("ddl/%s.sql", databaseName);
        URL ddlTestFile = PolardbxSourceTestBase.class.getClassLoader().getResource(ddlFile);
        Assert.assertNotNull((String)("Cannot locate " + ddlFile), (Object)ddlTestFile);
        Thread.sleep(1000L);
        try (Connection connection = PolardbxSourceTestBase.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("drop database if exists " + databaseName);
            statement.execute("create database if not exists " + databaseName);
            statement.execute("use " + databaseName + ";");
            List statements = Arrays.stream(Files.readAllLines(Paths.get(ddlTestFile.toURI())).stream().map(String::trim).filter(x -> !x.startsWith("--") && !x.isEmpty()).map(x -> {
                Matcher m = COMMENT_PATTERN.matcher((CharSequence)x);
                return m.matches() ? m.group(1) : x;
            }).collect(Collectors.joining("\n")).split(";")).filter(sql -> filter == null || (Boolean)filter.apply((String)sql) != false).collect(Collectors.toList());
            for (String stmt : statements) {
                statement.execute(stmt);
            }
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    protected static List<String> fetchRows(Iterator<Row> iter, int size) {
        ArrayList<String> rows = new ArrayList<String>(size);
        while (size > 0 && iter.hasNext()) {
            Row row = iter.next();
            rows.add(row.toString());
            --size;
        }
        return rows;
    }

    protected String getTableNameRegex(String[] captureCustomerTables) {
        Preconditions.checkState((captureCustomerTables.length > 0 ? 1 : 0) != 0);
        if (captureCustomerTables.length == 1) {
            return captureCustomerTables[0];
        }
        return String.format("(%s)", StringUtils.join((Object[])captureCustomerTables, (String)"|"));
    }

    protected String getServerId() {
        Random random = new Random();
        int serverId = random.nextInt(100) + 5400;
        return serverId + "-" + (serverId + 4);
    }

    protected static void waitForSinkSize(String sinkName, int expectedSize) throws InterruptedException {
        while (PolardbxSourceTestBase.sinkSize(sinkName) < expectedSize) {
            Thread.sleep(100L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected static int sinkSize(String sinkName) {
        Class<TestValuesTableFactory> clazz = TestValuesTableFactory.class;
        synchronized (TestValuesTableFactory.class) {
            try {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return TestValuesTableFactory.getRawResults((String)sinkName).size();
            }
            catch (IllegalArgumentException e) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return 0;
            }
        }
    }

    protected static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
        Assert.assertTrue((expected != null && actual != null ? 1 : 0) != 0);
        PolardbxSourceTestBase.assertEqualsInOrder(expected.stream().sorted().collect(Collectors.toList()), actual.stream().sorted().collect(Collectors.toList()));
    }

    protected static void assertEqualsInOrder(List<String> expected, List<String> actual) {
        Assert.assertTrue((expected != null && actual != null ? 1 : 0) != 0);
        Assert.assertEquals((long)expected.size(), (long)actual.size());
        Assert.assertArrayEquals((Object[])expected.toArray(new String[0]), (Object[])actual.toArray(new String[0]));
    }
}

