/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source;

import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobID;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

public abstract class MySqlSourceTestBase
extends TestLogger {
    protected static final Logger LOG = LoggerFactory.getLogger(MySqlSourceTestBase.class);
    protected static final int DEFAULT_PARALLELISM = 4;
    protected static final MySqlContainer MYSQL_CONTAINER = MySqlSourceTestBase.createMySqlContainer(MySqlVersion.V5_7);
    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(4).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());

    @BeforeClass
    public static void startContainers() {
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
        LOG.info("Containers are started.");
    }

    @AfterClass
    public static void stopContainers() {
        LOG.info("Stopping containers...");
        if (MYSQL_CONTAINER != null) {
            MYSQL_CONTAINER.stop();
        }
        LOG.info("Containers are stopped.");
    }

    protected static MySqlContainer createMySqlContainer(MySqlVersion version) {
        return MySqlSourceTestBase.createMySqlContainer(version, "docker/server-gtids/my.cnf");
    }

    protected static MySqlContainer createMySqlContainer(MySqlVersion version, String configPath) {
        return (MySqlContainer)new MySqlContainer(version).withConfigurationOverride(configPath).withSetupSQL("docker/setup.sql").withDatabaseName("flink-test").withUsername("flinkuser").withPassword("flinkpw").withLogConsumer((Consumer)new Slf4jLogConsumer(LOG));
    }

    public static void assertEqualsInAnyOrder(List<String> expected, List<String> actual) {
        Assert.assertTrue((expected != null && actual != null ? 1 : 0) != 0);
        MySqlSourceTestBase.assertEqualsInOrder(expected.stream().sorted().collect(Collectors.toList()), actual.stream().sorted().collect(Collectors.toList()));
    }

    public static void assertEqualsInOrder(List<String> expected, List<String> actual) {
        Assert.assertTrue((expected != null && actual != null ? 1 : 0) != 0);
        Assert.assertEquals((long)expected.size(), (long)actual.size());
        Assert.assertArrayEquals((Object[])expected.toArray(new String[0]), (Object[])actual.toArray(new String[0]));
    }

    public static void assertMapEquals(Map<String, ?> expected, Map<String, ?> actual) {
        Assert.assertTrue((expected != null && actual != null ? 1 : 0) != 0);
        Assert.assertEquals((long)expected.size(), (long)actual.size());
        for (String key : expected.keySet()) {
            Assert.assertEquals(expected.get(key), actual.get(key));
        }
    }

    protected static void triggerFailover(FailoverType type, JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
        switch (type) {
            case TM: {
                MySqlSourceTestBase.restartTaskManager(miniCluster, afterFailAction);
                break;
            }
            case JM: {
                MySqlSourceTestBase.triggerJobManagerFailover(jobId, miniCluster, afterFailAction);
                break;
            }
            case NONE: {
                break;
            }
            default: {
                throw new IllegalStateException("Unexpected value: " + (Object)((Object)type));
            }
        }
    }

    protected static void triggerJobManagerFailover(JobID jobId, MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
        HaLeadershipControl haLeadershipControl = (HaLeadershipControl)miniCluster.getHaLeadershipControl().get();
        haLeadershipControl.revokeJobMasterLeadership(jobId).get();
        afterFailAction.run();
        haLeadershipControl.grantJobMasterLeadership(jobId).get();
    }

    protected static void restartTaskManager(MiniCluster miniCluster, Runnable afterFailAction) throws Exception {
        miniCluster.terminateTaskManager(0).get();
        afterFailAction.run();
        miniCluster.startTaskManager();
    }

    protected static void waitForUpsertSinkSize(String sinkName, int expectedSize) throws InterruptedException {
        while (MySqlSourceTestBase.upsertSinkSize(sinkName) < expectedSize) {
            Thread.sleep(100L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected static int upsertSinkSize(String sinkName) {
        Class<TestValuesTableFactory> clazz = TestValuesTableFactory.class;
        synchronized (TestValuesTableFactory.class) {
            try {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return TestValuesTableFactory.getResults((String)sinkName).size();
            }
            catch (IllegalArgumentException e) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return 0;
            }
        }
    }

    protected static enum FailoverPhase {
        SNAPSHOT,
        BINLOG,
        NEVER;

    }

    protected static enum FailoverType {
        TM,
        JM,
        NONE;

    }
}

