/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.table;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.bson.BsonDateTime;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class MongoDBConnectorITCase
extends MongoDBSourceTestBase {
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)this.env, (EnvironmentSettings)EnvironmentSettings.newInstance().inStreamingMode().build());
    @ClassRule
    public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
    private final boolean parallelismSnapshot;

    public MongoDBConnectorITCase(boolean parallelismSnapshot) {
        this.parallelismSnapshot = parallelismSnapshot;
    }

    @Parameterized.Parameters(name="parallelismSnapshot: {0}")
    public static Object[] parameters() {
        return new Object[][]{{false}, {true}};
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        this.tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
        if (this.parallelismSnapshot) {
            this.env.setParallelism(4);
            this.env.enableCheckpointing(200L);
        } else {
            this.env.setParallelism(1);
        }
    }

    @Test
    public void testConsumingAllEvents() throws ExecutionException, InterruptedException {
        String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
        String sourceDDL = String.format("CREATE TABLE mongodb_source ( _id STRING NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), PRIMARY KEY (_id) NOT ENFORCED) WITH ( 'connector' = 'mongodb-cdc', 'connection.options' = 'connectTimeoutMS=12000&socketTimeoutMS=13000', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'heartbeat.interval.ms' = '1000')", CONTAINER.getHostAndPort(), "flinkuser", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", database, "products", this.parallelismSnapshot);
        String sinkDDL = "CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT name, SUM(weight) FROM mongodb_source GROUP BY name");
        MongoDBTestUtils.waitForSnapshotStarted("sink");
        MongoCollection products = mongodbClient.getDatabase(database).getCollection("products");
        products.updateOne(Filters.eq((String)"_id", (Object)new ObjectId("100000000000000000000106")), Updates.set((String)"description", (Object)"18oz carpenter hammer"));
        products.updateOne(Filters.eq((String)"_id", (Object)new ObjectId("100000000000000000000107")), Updates.set((String)"weight", (Object)5.1));
        products.insertOne((Object)this.productDocOf("100000000000000000000110", "jacket", "water resistent white wind breaker", 0.2));
        products.insertOne((Object)this.productDocOf("100000000000000000000111", "scooter", "Big 2-wheel scooter", 5.18));
        products.updateOne(Filters.eq((String)"_id", (Object)new ObjectId("100000000000000000000110")), Updates.combine((Bson[])new Bson[]{Updates.set((String)"description", (Object)"new water resistent white wind breaker"), Updates.set((String)"weight", (Object)0.5)}));
        products.updateOne(Filters.eq((String)"_id", (Object)new ObjectId("100000000000000000000111")), Updates.set((String)"weight", (Object)5.17));
        MongoDBTestUtils.waitForSinkSize("sink", 19);
        products.deleteOne(Filters.eq((String)"_id", (Object)new ObjectId("100000000000000000000111")));
        MongoDBTestUtils.waitForSinkSize("sink", 20);
        Object[] expected = new String[]{"scooter,3.140", "car battery,8.100", "12-pack drill bits,0.800", "hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200"};
        List actual = TestValuesTableFactory.getResults((String)"sink");
        Assert.assertThat((Object)actual, (Matcher)Matchers.containsInAnyOrder((Object[])expected));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testStartupFromTimestamp() throws Exception {
        String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
        Thread.sleep(5000L);
        String sourceDDL = String.format("CREATE TABLE mongodb_source ( _id STRING NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), PRIMARY KEY (_id) NOT ENFORCED) WITH ( 'connector' = 'mongodb-cdc', 'connection.options' = 'connectTimeoutMS=12000&socketTimeoutMS=13000', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'scan.startup.mode' = 'timestamp', 'scan.startup.timestamp-millis' = '" + System.currentTimeMillis() + "', 'heartbeat.interval.ms' = '1000')", CONTAINER.getHostAndPort(), "flinkuser", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", database, "products", this.parallelismSnapshot);
        String sinkDDL = "CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT name, SUM(weight) FROM mongodb_source GROUP BY name");
        MongoCollection products = mongodbClient.getDatabase(database).getCollection("products");
        products.insertOne((Object)this.productDocOf("100000000000000000000110", "jacket", "water resistent white wind breaker", 0.2));
        products.insertOne((Object)this.productDocOf("100000000000000000000111", "scooter", "Big 2-wheel scooter", 5.18));
        MongoDBTestUtils.waitForSinkSize("sink", 2);
        Object[] expected = new String[]{"jacket,0.200", "scooter,5.180"};
        List actual = TestValuesTableFactory.getResults((String)"sink");
        Assert.assertThat((Object)actual, (Matcher)Matchers.containsInAnyOrder((Object[])expected));
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testAllTypes() throws Throwable {
        String database = CONTAINER.executeCommandFileInSeparateDatabase("column_type_test");
        String sourceDDL = String.format("CREATE TABLE full_types (\n    _id STRING,\n    stringField STRING,\n    uuidField STRING,\n    md5Field STRING,\n    timeField TIME,\n    dateField DATE,\n    dateBefore1970 DATE,\n    dateToTimestampField TIMESTAMP(3),\n    dateToLocalTimestampField TIMESTAMP_LTZ(3),\n    timestampField TIMESTAMP(0),\n    timestampToLocalTimestampField TIMESTAMP_LTZ(0),\n    booleanField BOOLEAN,\n    decimal128Field DECIMAL ,\n    doubleField DOUBLE,\n    int32field INT,\n    int64Field BIGINT,\n    documentField ROW<a STRING,b BIGINT>,\n    mapField MAP<STRING,MAP<STRING,INT>>,\n    arrayField ARRAY<STRING>,\n    doubleArrayField ARRAY<DOUBLE>,\n    documentArrayField ARRAY<ROW<a STRING,b BIGINT>>,\n    minKeyField STRING,\n    maxKeyField STRING,\n    regexField STRING,\n    undefinedField STRING,\n    nullField STRING,\n    binaryField BINARY,\n    javascriptField STRING,\n    dbReferenceField ROW<$ref STRING,$id STRING>,\n    PRIMARY KEY (_id) NOT ENFORCED) WITH ( 'connector' = 'mongodb-cdc', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s')", CONTAINER.getHostAndPort(), "flinkuser", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", database, "full_types");
        String sinkDDL = "CREATE TABLE sink (\n    _id STRING,\n    stringField STRING,\n    uuidField STRING,\n    md5Field STRING,\n    timeField TIME,\n    dateField DATE,\n    dateBefore1970 DATE,\n    dateToTimestampField TIMESTAMP(3),\n    dateToLocalTimestampField TIMESTAMP_LTZ(3),\n    timestampField TIMESTAMP(0),\n    timestampToLocalTimestampField TIMESTAMP_LTZ(0),\n    booleanField BOOLEAN,\n    decimal128Field DECIMAL ,\n    doubleField DOUBLE,\n    int32field INT,\n    int64Field BIGINT,\n    documentField ROW<a STRING,b BIGINT>,\n    mapField MAP<STRING,MAP<STRING,INT>>,\n    arrayField ARRAY<STRING>,\n    doubleArrayField ARRAY<DOUBLE>,\n    documentArrayField ARRAY<ROW<a STRING,b BIGINT>>,\n    minKeyField STRING,\n    maxKeyField STRING,\n    regexField STRING,\n    undefinedField STRING,\n    nullField STRING,\n    binaryField BINARY,\n    javascriptField STRING,\n    dbReferenceField ROW<$ref STRING,$id STRING>\n) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO sink SELECT _id,\nstringField,\nuuidField,\nmd5Field,\ntimeField,\ndateField,\ndateBefore1970,\ndateToTimestampField,\ndateToLocalTimestampField,\ntimestampField,\ntimestampToLocalTimestampField,\nbooleanField,\ndecimal128Field,\ndoubleField,\nint32field,\nint64Field,\ndocumentField,\nmapField,\narrayField,\ndoubleArrayField,\ndocumentArrayField,\nminKeyField,\nmaxKeyField,\nregexField,\nundefinedField,\nnullField,\nbinaryField,\njavascriptField,\ndbReferenceField\nFROM full_types");
        MongoDBTestUtils.waitForSnapshotStarted("sink");
        MongoCollection fullTypes = mongodbClient.getDatabase(database).getCollection("full_types");
        fullTypes.updateOne(Filters.eq((String)"_id", (Object)new ObjectId("5d505646cf6d4fe581014ab2")), Updates.set((String)"int64Field", (Object)510L));
        MongoDBTestUtils.waitForSinkSize("sink", 3);
        BsonDateTime updatedDateTime = new BsonDateTime(1630694164123L);
        BsonTimestamp updatedTimestamp = new BsonTimestamp(1630694164, 0);
        fullTypes.updateOne(Filters.eq((String)"_id", (Object)new ObjectId("5d505646cf6d4fe581014ab2")), Updates.combine((Bson[])new Bson[]{Updates.set((String)"timeField", (Object)updatedDateTime), Updates.set((String)"dateField", (Object)updatedDateTime), Updates.set((String)"dateToTimestampField", (Object)updatedDateTime), Updates.set((String)"dateToLocalTimestampField", (Object)updatedDateTime), Updates.set((String)"timestampField", (Object)updatedTimestamp), Updates.set((String)"timestampToLocalTimestampField", (Object)updatedTimestamp)}));
        MongoDBTestUtils.waitForSinkSize("sink", 5);
        List<String> expected = Arrays.asList("+I(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,50,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)", "-U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,50,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)", "+U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)", "-U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)", "+U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,18:36:04,2021-09-03,1960-08-11,2021-09-03T18:36:04.123,2021-09-03T18:36:04.123Z,2021-09-03T18:36:04,2021-09-03T18:36:04Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)");
        List actual = TestValuesTableFactory.getRawResults((String)"sink");
        Assert.assertEquals(expected, (Object)actual);
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMetadataColumns() throws Exception {
        String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
        String sourceDDL = String.format("CREATE TABLE mongodb_source ( _id STRING NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA VIRTUAL, PRIMARY KEY (_id) NOT ENFORCED) WITH ( 'connector' = 'mongodb-cdc', 'connection.options' = 'connectTimeoutMS=12000&socketTimeoutMS=13000', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s', 'scan.incremental.snapshot.enabled' = '%s')", CONTAINER.getHostAndPort(), "flinkuser", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", database, "products", this.parallelismSnapshot);
        String sinkDDL = "CREATE TABLE meta_sink ( _id STRING NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), database_name STRING, collection_name STRING, PRIMARY KEY (_id) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')";
        this.tEnv.executeSql(sourceDDL);
        this.tEnv.executeSql(sinkDDL);
        TableResult result = this.tEnv.executeSql("INSERT INTO meta_sink SELECT * FROM mongodb_source");
        MongoDBTestUtils.waitForSinkSize("meta_sink", 9);
        MongoCollection products = mongodbClient.getDatabase(database).getCollection("products");
        products.updateOne(Filters.eq((String)"_id", (Object)new ObjectId("100000000000000000000106")), Updates.set((String)"description", (Object)"18oz carpenter hammer"));
        products.updateOne(Filters.eq((String)"_id", (Object)new ObjectId("100000000000000000000107")), Updates.set((String)"weight", (Object)5.1));
        products.insertOne((Object)this.productDocOf("100000000000000000000110", "jacket", "water resistent white wind breaker", 0.2));
        products.insertOne((Object)this.productDocOf("100000000000000000000111", "scooter", "Big 2-wheel scooter", 5.18));
        products.updateOne(Filters.eq((String)"_id", (Object)new ObjectId("100000000000000000000110")), Updates.combine((Bson[])new Bson[]{Updates.set((String)"description", (Object)"new water resistent white wind breaker"), Updates.set((String)"weight", (Object)0.5)}));
        products.updateOne(Filters.eq((String)"_id", (Object)new ObjectId("100000000000000000000111")), Updates.set((String)"weight", (Object)5.17));
        MongoDBTestUtils.waitForSinkSize("meta_sink", 15);
        products.deleteOne(Filters.eq((String)"_id", (Object)new ObjectId("100000000000000000000111")));
        MongoDBTestUtils.waitForSinkSize("meta_sink", 16);
        List expected = Stream.of("+I(100000000000000000000101,scooter,Small 2-wheel scooter,3.140,%s,products)", "+I(100000000000000000000102,car battery,12V car battery,8.100,%s,products)", "+I(100000000000000000000103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800,%s,products)", "+I(100000000000000000000104,hammer,12oz carpenter''s hammer,0.750,%s,products)", "+I(100000000000000000000105,hammer,12oz carpenter''s hammer,0.875,%s,products)", "+I(100000000000000000000106,hammer,12oz carpenter''s hammer,1.000,%s,products)", "+I(100000000000000000000107,rocks,box of assorted rocks,5.300,%s,products)", "+I(100000000000000000000108,jacket,water resistent black wind breaker,0.100,%s,products)", "+I(100000000000000000000109,spare tire,24 inch spare tire,22.200,%s,products)", "+I(100000000000000000000110,jacket,water resistent white wind breaker,0.200,%s,products)", "+I(100000000000000000000111,scooter,Big 2-wheel scooter,5.180,%s,products)", "+U(100000000000000000000106,hammer,18oz carpenter hammer,1.000,%s,products)", "+U(100000000000000000000107,rocks,box of assorted rocks,5.100,%s,products)", "+U(100000000000000000000110,jacket,new water resistent white wind breaker,0.500,%s,products)", "+U(100000000000000000000111,scooter,Big 2-wheel scooter,5.170,%s,products)", "-D(100000000000000000000111,scooter,Big 2-wheel scooter,5.170,%s,products)").map(s -> String.format(s, database)).sorted().collect(Collectors.toList());
        List actual = TestValuesTableFactory.getRawResults((String)"meta_sink");
        Collections.sort(actual);
        Assert.assertEquals(expected, (Object)actual);
        ((JobClient)result.getJobClient().get()).cancel().get();
    }

    private Document productDocOf(String id, String name, String description, Double weight) {
        Document document = new Document();
        if (id != null) {
            document.put("_id", (Object)new ObjectId(id));
        }
        document.put("name", (Object)name);
        document.put("description", (Object)description);
        document.put("weight", (Object)weight);
        return document;
    }
}

