/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.junit.Ignore;
import org.junit.Test;

public class MongoDBParallelSourceExampleTest
extends MongoDBSourceTestBase {
    @Test
    @Ignore(value="Test ignored because it won't stop and is used for manual test")
    public void testMongoDBExampleSource() throws Exception {
        String database = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
        MongoDBSource mongoSource = MongoDBSource.builder().hosts(CONTAINER.getHostAndPort()).databaseList(new String[]{database}).collectionList(new String[]{database + ".products"}).username("flinkuser").password("a1?~!@#$%^&*(){}[]<>.,+_-=/|:;").deserializer((DebeziumDeserializationSchema)new JsonDebeziumDeserializationSchema()).closeIdleReaders(true).build();
        Configuration config = new Configuration();
        config.set(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH, (Object)true);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)config);
        env.enableCheckpointing(3000L);
        env.fromSource((Source)mongoSource, WatermarkStrategy.noWatermarks(), "MongoDBParallelSource").setParallelism(2).print().setParallelism(1);
        env.execute("Print MongoDB Snapshot + Change Stream");
    }
}

