/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.table;

import com.mongodb.client.MongoDatabase;
import java.util.List;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.bson.Document;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class MongoDBRegexFilterITCase
extends MongoDBSourceTestBase {
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env, (EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().build());
    private final boolean parallelismSnapshot;

    public MongoDBRegexFilterITCase(boolean parallelismSnapshot) {
        this.parallelismSnapshot = parallelismSnapshot;
    }

    @Parameterized.Parameters(name="parallelismSnapshot: {0}")
    public static Object[] parameters() {
        return new Object[][]{{false}, {true}};
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        if (this.parallelismSnapshot) {
            this.env.setParallelism(4);
            this.env.enableCheckpointing(200L);
        } else {
            this.env.setParallelism(1);
        }
    }

    @Test
    public void testMatchMultipleDatabasesAndCollections() throws Exception {
        String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String db1 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String collectionRegex = String.format("^(%s|%s)\\.coll_a\\d?$", db0, db1);
        TableResult result = this.submitTestCase(null, collectionRegex);
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 4);
        this.insertRecordsInDatabase(db0);
        this.insertRecordsInDatabase(db1);
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 8);
        Object[] expected = new String[]{String.format("+I[%s, coll_a1, A101]", db0), String.format("+I[%s, coll_a2, A201]", db0), String.format("+I[%s, coll_a1, A101]", db1), String.format("+I[%s, coll_a2, A201]", db1), String.format("+I[%s, coll_a1, A102]", db0), String.format("+I[%s, coll_a2, A202]", db0), String.format("+I[%s, coll_a1, A102]", db1), String.format("+I[%s, coll_a2, A202]", db1)};
        List actual = TestValuesTableFactory.getResults((String)"mongodb_sink");
        Assert.assertThat((Object)actual, (Matcher)Matchers.containsInAnyOrder((Object[])expected));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMatchMultipleDatabases() throws Exception {
        String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String db1 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String db2 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String databaseRegex = String.format("%s|%s", db0, db1);
        TableResult result = this.submitTestCase(databaseRegex, null);
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 8);
        this.insertRecordsInDatabase(db0);
        this.insertRecordsInDatabase(db1);
        this.insertRecordsInDatabase(db2);
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 16);
        Object[] expected = new String[]{String.format("+I[%s, coll_a1, A101]", db0), String.format("+I[%s, coll_a2, A201]", db0), String.format("+I[%s, coll_b1, B101]", db0), String.format("+I[%s, coll_b2, B201]", db0), String.format("+I[%s, coll_a1, A101]", db1), String.format("+I[%s, coll_a2, A201]", db1), String.format("+I[%s, coll_b1, B101]", db1), String.format("+I[%s, coll_b2, B201]", db1), String.format("+I[%s, coll_a1, A102]", db0), String.format("+I[%s, coll_a2, A202]", db0), String.format("+I[%s, coll_b1, B102]", db0), String.format("+I[%s, coll_b2, B202]", db0), String.format("+I[%s, coll_a1, A102]", db1), String.format("+I[%s, coll_a2, A202]", db1), String.format("+I[%s, coll_b1, B102]", db1), String.format("+I[%s, coll_b2, B202]", db1)};
        List actual = TestValuesTableFactory.getResults((String)"mongodb_sink");
        Assert.assertThat((Object)actual, (Matcher)Matchers.containsInAnyOrder((Object[])expected));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMatchSingleQualifiedCollectionPattern() throws Exception {
        String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String db1 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String collectionRegex = String.format("^%s\\.coll_b\\d?$", db0);
        TableResult result = this.submitTestCase(null, collectionRegex);
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 2);
        this.insertRecordsInDatabase(db0);
        this.insertRecordsInDatabase(db1);
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 4);
        Object[] expected = new String[]{String.format("+I[%s, coll_b1, B101]", db0), String.format("+I[%s, coll_b2, B201]", db0), String.format("+I[%s, coll_b1, B102]", db0), String.format("+I[%s, coll_b2, B202]", db0)};
        List actual = TestValuesTableFactory.getResults((String)"mongodb_sink");
        Assert.assertThat((Object)actual, (Matcher)Matchers.containsInAnyOrder((Object[])expected));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMatchSingleDatabaseWithCollectionPattern() throws Exception {
        String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String db1 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String collectionRegex = ".*coll_b\\d?";
        TableResult result = this.submitTestCase(db0, collectionRegex);
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 2);
        this.insertRecordsInDatabase(db0);
        this.insertRecordsInDatabase(db1);
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 4);
        Object[] expected = new String[]{String.format("+I[%s, coll_b1, B101]", db0), String.format("+I[%s, coll_b2, B201]", db0), String.format("+I[%s, coll_b1, B102]", db0), String.format("+I[%s, coll_b2, B202]", db0)};
        List actual = TestValuesTableFactory.getResults((String)"mongodb_sink");
        Assert.assertThat((Object)actual, (Matcher)Matchers.containsInAnyOrder((Object[])expected));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMatchDatabaseAndCollectionContainsDash() throws Exception {
        String db0 = CONTAINER.executeCommandFileInSeparateDatabase("ns-regex");
        TableResult result = this.submitTestCase(db0, "coll-a1");
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 1);
        Object[] expected = new String[]{String.format("+I[%s, coll-a1, A101]", db0)};
        List actual = TestValuesTableFactory.getResults((String)"mongodb_sink");
        Assert.assertThat((Object)actual, (Matcher)Matchers.containsInAnyOrder((Object[])expected));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMatchCollectionWithDots() throws Exception {
        String db = CONTAINER.executeCommandFileInSeparateDatabase("ns-dotted");
        TableResult result = this.submitTestCase(db, db + "[.]coll[.]name");
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 3);
        Object[] expected = new String[]{String.format("+I[%s, coll.name, A101]", db), String.format("+I[%s, coll.name, A102]", db), String.format("+I[%s, coll.name, A103]", db)};
        List actual = TestValuesTableFactory.getResults((String)"mongodb_sink");
        Assert.assertThat((Object)actual, (Matcher)Matchers.containsInAnyOrder((Object[])expected));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    private TableResult submitTestCase(String database, String collection) throws Exception {
        String sourceDDL = "CREATE TABLE mongodb_source ( _id STRING NOT NULL, seq STRING, db_name STRING METADATA FROM 'database_name' VIRTUAL, coll_name STRING METADATA FROM 'collection_name' VIRTUAL, PRIMARY KEY (_id) NOT ENFORCED) WITH (" + this.ignoreIfNull("hosts", CONTAINER.getHostAndPort()) + this.ignoreIfNull("username", "flinkuser") + this.ignoreIfNull("password", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;") + this.ignoreIfNull("database", database) + this.ignoreIfNull("collection", collection) + " 'scan.incremental.snapshot.enabled' = '" + this.parallelismSnapshot + "', 'connector' = 'mongodb-cdc')";
        String sinkDDL = "CREATE TABLE mongodb_sink ( db_name STRING, coll_name STRING, seq STRING, PRIMARY KEY (db_name, coll_name, seq) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO mongodb_sink SELECT db_name, coll_name, seq FROM mongodb_source");
        MongoDBTestUtils.waitForSnapshotStarted("mongodb_sink");
        return result;
    }

    private String ignoreIfNull(String configName, String configValue) {
        return configValue != null ? String.format(" '%s' = '%s',", configName, configValue) : "";
    }

    private void insertRecordsInDatabase(String database) {
        MongoDatabase db = mongodbClient.getDatabase(database);
        db.getCollection("coll_a1").insertOne((Object)new Document("seq", (Object)"A102"));
        db.getCollection("coll_a2").insertOne((Object)new Document("seq", (Object)"A202"));
        db.getCollection("coll_b1").insertOne((Object)new Document("seq", (Object)"B102"));
        db.getCollection("coll_b2").insertOne((Object)new Document("seq", (Object)"B202"));
    }
}

