/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.postgres;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.function.Consumer;
import java.util.stream.Stream;
import org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions;
import org.apache.paimon.flink.action.cdc.CdcActionITCaseBase;
import org.apache.paimon.flink.action.cdc.postgres.PostgresContainer;
import org.apache.paimon.flink.action.cdc.postgres.PostgresSyncTableAction;
import org.junit.jupiter.api.AfterAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;

public class PostgresActionITCaseBase
extends CdcActionITCaseBase {
    private static final Logger LOG = LoggerFactory.getLogger(PostgresActionITCaseBase.class);
    public static final String DEFAULT_DB = "postgres";
    private static final String USER = "paimonuser";
    private static final String PASSWORD = "paimonpw";
    protected static final DockerImageName PG_IMAGE = DockerImageName.parse((String)"postgres:13").asCompatibleSubstituteFor("postgres");
    protected static final PostgresContainer POSTGRES_CONTAINER = (PostgresContainer)((PostgresContainer)((PostgresContainer)((PostgresContainer)((PostgresContainer)((PostgresContainer)new PostgresContainer(PG_IMAGE).withDatabaseName("postgres")).withUsername("paimonuser")).withPassword("paimonpw")).withEnv("TZ", "America/Los_Angeles")).withLogConsumer((Consumer)new Slf4jLogConsumer(LOG))).withCommand(new String[]{"postgres", "-c", "fsync=off", "-c", "wal_level=logical", "-c", "max_replication_slots=20", "-c", "max_wal_senders=20"});

    protected static void start() {
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(POSTGRES_CONTAINER)).join();
        LOG.info("Containers are started.");
    }

    @AfterAll
    public static void stopContainers() {
        LOG.info("Stopping containers...");
        POSTGRES_CONTAINER.stop();
        LOG.info("Containers are stopped.");
    }

    protected Statement getStatement(String databaseName) throws SQLException {
        String jdbcUrl = String.format("jdbc:postgresql://%s:%s/%s", POSTGRES_CONTAINER.getHost(), POSTGRES_CONTAINER.getDatabasePort(), databaseName);
        Connection conn = DriverManager.getConnection(jdbcUrl, POSTGRES_CONTAINER.getUsername(), POSTGRES_CONTAINER.getPassword());
        return conn.createStatement();
    }

    protected Map<String, String> getBasicPostgresConfig() {
        HashMap<String, String> config = new HashMap<String, String>();
        config.put(PostgresSourceOptions.HOSTNAME.key(), POSTGRES_CONTAINER.getHost());
        config.put(PostgresSourceOptions.PG_PORT.key(), String.valueOf(POSTGRES_CONTAINER.getDatabasePort()));
        config.put(PostgresSourceOptions.USERNAME.key(), USER);
        config.put(PostgresSourceOptions.PASSWORD.key(), PASSWORD);
        config.put(PostgresSourceOptions.SLOT_NAME.key(), this.getSlotName());
        config.put(PostgresSourceOptions.DECODING_PLUGIN_NAME.key(), "pgoutput");
        return config;
    }

    protected String getSlotName() {
        Random random = new Random();
        int id = random.nextInt(10000);
        return "paimon_" + id;
    }

    protected PostgresSyncTableActionBuilder syncTableActionBuilder(Map<String, String> postgresConfig) {
        return new PostgresSyncTableActionBuilder(postgresConfig);
    }

    protected class PostgresSyncTableActionBuilder
    extends CdcActionITCaseBase.SyncTableActionBuilder<PostgresSyncTableAction> {
        public PostgresSyncTableActionBuilder(Map<String, String> postgresConfig) {
            super(PostgresSyncTableAction.class, postgresConfig);
        }
    }
}

