/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.db2.table;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.connectors.db2.Db2TestBase;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class Db2ConnectorITCase
extends Db2TestBase {
    private static final Logger LOG = LoggerFactory.getLogger(Db2ConnectorITCase.class);
    protected static final int DEFAULT_PARALLELISM = 2;
    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(2).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env, (EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().build());
    @ClassRule
    public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
    private final boolean incrementalSnapshot;

    public Db2ConnectorITCase(boolean incrementalSnapshot) {
        this.incrementalSnapshot = incrementalSnapshot;
    }

    @Parameterized.Parameters(name="incrementalSnapshot: {0}")
    public static Object[] parameters() {
        return new Object[][]{{true}, {false}};
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        this.env.setRestartStrategy(RestartStrategies.noRestart());
        if (this.incrementalSnapshot) {
            this.env.setParallelism(2);
            this.env.enableCheckpointing(1000L);
        } else {
            this.env.setParallelism(1);
        }
    }

    private void cancelJobIfRunning(TableResult result) throws InterruptedException, ExecutionException {
        try {
            ((JobClient)result.getJobClient().get()).cancel().get();
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testConsumingAllEvents() throws SQLException, InterruptedException, ExecutionException {
        this.initializeDb2Table("inventory", "PRODUCTS");
        String sourceDDL = String.format("CREATE TABLE debezium_source ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3)) WITH ( 'connector' = 'db2-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s')", DB2_CONTAINER.getHost(), DB2_CONTAINER.getMappedPort(50000), DB2_CONTAINER.getUsername(), DB2_CONTAINER.getPassword(), DB2_CONTAINER.getDatabaseName(), "DB2INST1.PRODUCTS", this.incrementalSnapshot);
        String sinkDDL = "CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT NAME, SUM(WEIGHT) FROM debezium_source GROUP BY NAME");
        Db2ConnectorITCase.waitForSnapshotStarted("sink");
        try (Connection connection = this.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106;");
            statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.1' WHERE ID=107;");
            statement.execute("INSERT INTO DB2INST1.PRODUCTS VALUES (110,'jacket','water resistent white wind breaker',0.2);");
            statement.execute("INSERT INTO DB2INST1.PRODUCTS VALUES (111,'scooter','Big 2-wheel scooter ',5.18);");
            statement.execute("UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110;");
            statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.17' WHERE ID=111;");
            statement.execute("DELETE FROM DB2INST1.PRODUCTS WHERE ID=111;");
        }
        Db2ConnectorITCase.waitForSinkSize("sink", 20);
        Object[] expected = new String[]{"scooter,3.140", "car battery,8.100", "12-pack drill bits,0.800", "hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200"};
        List actual = TestValuesTableFactory.getResults((String)"sink");
        Assert.assertThat((Object)actual, (Matcher)Matchers.containsInAnyOrder((Object[])expected));
        this.cancelJobIfRunning(result);
    }

    @Test
    public void testAllTypes() throws Exception {
        this.initializeDb2Table("column_type_test", "FULL_TYPES");
        String sourceDDL = String.format("CREATE TABLE full_types (\n    ID INTEGER NOT NULL,\n    SMALL_C SMALLINT,\n    INT_C INTEGER,\n    BIG_C BIGINT,\n    REAL_C FLOAT,\n    DOUBLE_C DOUBLE,\n    NUMERIC_C DECIMAL(10, 5),\n    DECIMAL_C DECIMAL(10, 1),\n    VARCHAR_C STRING,\n    CHAR_C STRING,\n    CHARACTER_C STRING,\n    TIMESTAMP_C TIMESTAMP(3),\n    DATE_C DATE,\n    TIME_C TIME(0),\n    DEFAULT_NUMERIC_C DECIMAL,\n    TIMESTAMP_PRECISION_C TIMESTAMP(9),\n    PRIMARY KEY (ID) NOT ENFORCED) WITH ( 'connector' = 'db2-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s')", DB2_CONTAINER.getHost(), DB2_CONTAINER.getMappedPort(50000), DB2_CONTAINER.getUsername(), DB2_CONTAINER.getPassword(), DB2_CONTAINER.getDatabaseName(), "DB2INST1.FULL_TYPES", this.incrementalSnapshot);
        String sinkDDL = "CREATE TABLE sink (\n    id INTEGER NOT NULL,\n    small_c SMALLINT,\n    int_c INTEGER,\n    big_c BIGINT,\n    real_c FLOAT,\n    double_c DOUBLE,\n    numeric_c DECIMAL(10, 5),\n    decimal_c DECIMAL(10, 1),\n    varchar_c STRING,\n    char_c STRING,\n    character_c STRING,\n    timestamp_c TIMESTAMP(3),\n    date_c DATE,\n    time_c TIME(0),\n    default_numeric_c DECIMAL,\n    timestamp_precision_c TIMESTAMP(9),\n    PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM full_types");
        Db2ConnectorITCase.waitForSnapshotStarted("sink");
        try (Connection connection = this.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE DB2INST1.FULL_TYPES SET SMALL_C=0 WHERE ID=1;");
        }
        Db2ConnectorITCase.waitForSinkSize("sink", 2);
        List<String> expected = Arrays.asList("+I(1,32767,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)", "+U(1,0,65535,2147483647,5.5,6.6,123.12345,404.4,Hello World,a,abc,2020-07-17T18:00:22.123,2020-07-17,18:00:22,500,2020-07-17T18:00:22.123456789)");
        List actual = TestValuesTableFactory.getRawResults((String)"sink");
        Collections.sort(expected);
        Collections.sort(actual);
        Assert.assertEquals(expected, (Object)actual);
        this.cancelJobIfRunning(result);
    }

    @Test
    public void testStartupFromLatestOffset() throws Exception {
        this.initializeDb2Table("inventory", "PRODUCTS");
        String sourceDDL = String.format("CREATE TABLE debezium_source ( ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3), PRIMARY KEY (ID) NOT ENFORCED) WITH ( 'connector' = 'db2-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s' , 'scan.startup.mode' = 'latest-offset', 'scan.incremental.snapshot.enabled' = '%s')", DB2_CONTAINER.getHost(), DB2_CONTAINER.getMappedPort(50000), DB2_CONTAINER.getUsername(), DB2_CONTAINER.getPassword(), DB2_CONTAINER.getDatabaseName(), "DB2INST1.PRODUCTS", this.incrementalSnapshot);
        String sinkDDL = "CREATE TABLE sink  WITH ( 'connector' = 'values', 'sink-insert-only' = 'false') LIKE debezium_source (EXCLUDING OPTIONS)";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        do {
            Thread.sleep(5000L);
        } while (((JobClient)result.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
        Thread.sleep(30000L);
        LOG.info("Snapshot should end and start to read binlog.");
        try (Connection connection = this.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("INSERT INTO DB2INST1.PRODUCTS VALUES (default,'jacket','water resistent white wind breaker',0.2)");
            statement.execute("INSERT INTO DB2INST1.PRODUCTS VALUES (default,'scooter','Big 2-wheel scooter ',5.18)");
            statement.execute("UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110");
            statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.17' WHERE ID=111");
            statement.execute("DELETE FROM DB2INST1.PRODUCTS WHERE ID=111");
        }
        Db2ConnectorITCase.waitForSinkSize("sink", 5);
        Object[] expected = new String[]{"110,jacket,new water resistent white wind breaker,0.500"};
        List actual = TestValuesTableFactory.getResults((String)"sink");
        Assert.assertThat((Object)actual, (Matcher)Matchers.containsInAnyOrder((Object[])expected));
        this.cancelJobIfRunning(result);
    }

    @Test
    public void testMetadataColumns() throws Throwable {
        this.initializeDb2Table("inventory", "PRODUCTS");
        String sourceDDL = String.format("CREATE TABLE debezium_source ( DB_NAME STRING METADATA FROM 'database_name' VIRTUAL, SCHEMA_NAME STRING METADATA FROM 'schema_name' VIRTUAL, TABLE_NAME STRING METADATA  FROM 'table_name' VIRTUAL, ID INT NOT NULL, NAME STRING, DESCRIPTION STRING, WEIGHT DECIMAL(10,3), PRIMARY KEY (ID) NOT ENFORCED) WITH ( 'connector' = 'db2-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = '%s')", DB2_CONTAINER.getHost(), DB2_CONTAINER.getMappedPort(50000), DB2_CONTAINER.getUsername(), DB2_CONTAINER.getPassword(), DB2_CONTAINER.getDatabaseName(), "DB2INST1.PRODUCTS", this.incrementalSnapshot);
        String sinkDDL = "CREATE TABLE sink ( database_name STRING, schema_name STRING, table_name STRING, id int, name STRING, description STRING, weight DECIMAL(10,3), PRIMARY KEY (id) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false', 'sink-expected-messages-num' = '20')";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT * FROM debezium_source");
        Db2ConnectorITCase.waitForSnapshotStarted("sink");
        try (Connection connection = this.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='18oz carpenter hammer' WHERE ID=106;");
            statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.1' WHERE ID=107;");
            statement.execute("INSERT INTO DB2INST1.PRODUCTS VALUES (110,'jacket','water resistent white wind breaker',0.2);");
            statement.execute("INSERT INTO DB2INST1.PRODUCTS VALUES (111,'scooter','Big 2-wheel scooter ',5.18);");
            statement.execute("UPDATE DB2INST1.PRODUCTS SET DESCRIPTION='new water resistent white wind breaker', WEIGHT='0.5' WHERE ID=110;");
            statement.execute("UPDATE DB2INST1.PRODUCTS SET WEIGHT='5.17' WHERE ID=111;");
            statement.execute("DELETE FROM DB2INST1.PRODUCTS WHERE ID=111;");
        }
        Db2ConnectorITCase.waitForSinkSize("sink", 16);
        List<String> expected = Arrays.asList("+I(testdb,DB2INST1,PRODUCTS,101,scooter,Small 2-wheel scooter,3.140)", "+I(testdb,DB2INST1,PRODUCTS,102,car battery,12V car battery,8.100)", "+I(testdb,DB2INST1,PRODUCTS,103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800)", "+I(testdb,DB2INST1,PRODUCTS,104,hammer,12oz carpenter's hammer,0.750)", "+I(testdb,DB2INST1,PRODUCTS,105,hammer,14oz carpenter's hammer,0.875)", "+I(testdb,DB2INST1,PRODUCTS,106,hammer,16oz carpenter's hammer,1.000)", "+I(testdb,DB2INST1,PRODUCTS,107,rocks,box of assorted rocks,5.300)", "+I(testdb,DB2INST1,PRODUCTS,108,jacket,water resistent black wind breaker,0.100)", "+I(testdb,DB2INST1,PRODUCTS,109,spare tire,24 inch spare tire,22.200)", "+U(testdb,DB2INST1,PRODUCTS,106,hammer,18oz carpenter hammer,1.000)", "+U(testdb,DB2INST1,PRODUCTS,107,rocks,box of assorted rocks,5.100)", "+I(testdb,DB2INST1,PRODUCTS,110,jacket,water resistent white wind breaker,0.200)", "+I(testdb,DB2INST1,PRODUCTS,111,scooter,Big 2-wheel scooter ,5.180)", "+U(testdb,DB2INST1,PRODUCTS,110,jacket,new water resistent white wind breaker,0.500)", "+U(testdb,DB2INST1,PRODUCTS,111,scooter,Big 2-wheel scooter ,5.170)", "-D(testdb,DB2INST1,PRODUCTS,111,scooter,Big 2-wheel scooter ,5.170)");
        List actual = TestValuesTableFactory.getRawResults((String)"sink");
        Collections.sort(expected);
        Collections.sort(actual);
        Assert.assertEquals(expected, (Object)actual);
        this.cancelJobIfRunning(result);
    }

    private static void waitForSnapshotStarted(String sinkName) throws InterruptedException {
        while (Db2ConnectorITCase.sinkSize(sinkName) == 0) {
            Thread.sleep(1000L);
        }
    }

    private static void waitForSinkSize(String sinkName, int expectedSize) throws InterruptedException {
        while (Db2ConnectorITCase.sinkSize(sinkName) < expectedSize) {
            Thread.sleep(1000L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static int sinkSize(String sinkName) {
        Class<TestValuesTableFactory> clazz = TestValuesTableFactory.class;
        synchronized (TestValuesTableFactory.class) {
            try {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return TestValuesTableFactory.getRawResults((String)sinkName).size();
            }
            catch (IllegalArgumentException e) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return 0;
            }
        }
    }
}

