/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.source;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSource;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceBuilder;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils;
import org.apache.flink.cdc.connectors.mongodb.utils.TestTable;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.bson.Document;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class MongoDBParallelSourceITCase
extends MongoDBSourceTestBase {
    private static final int USE_POST_LOWWATERMARK_HOOK = 1;
    private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
    private static final int USE_POST_HIGHWATERMARK_HOOK = 3;
    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds((long)300L);

    @Test
    public void testReadSingleCollectionWithSingleParallelism() throws Exception {
        this.testMongoDBParallelSource(1, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, new String[]{"customers"});
    }

    @Test
    public void testReadSingleCollectionWithMultipleParallelism() throws Exception {
        this.testMongoDBParallelSource(4, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, new String[]{"customers"});
    }

    @Test
    public void testReadMultipleCollectionWithSingleParallelism() throws Exception {
        this.testMongoDBParallelSource(1, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testReadMultipleCollectionWithMultipleParallelism() throws Exception {
        this.testMongoDBParallelSource(4, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverInSnapshotPhase() throws Exception {
        this.testMongoDBParallelSource(MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverInStreamPhase() throws Exception {
        this.testMongoDBParallelSource(MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.STREAM, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testJobManagerFailoverInSnapshotPhase() throws Exception {
        this.testMongoDBParallelSource(MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testJobManagerFailoverInStreamPhase() throws Exception {
        this.testMongoDBParallelSource(MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.STREAM, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverSingleParallelism() throws Exception {
        this.testMongoDBParallelSource(1, MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, new String[]{"customers"});
    }

    @Test
    public void testJobManagerFailoverSingleParallelism() throws Exception {
        this.testMongoDBParallelSource(1, MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, new String[]{"customers"});
    }

    @Test
    public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exception {
        this.testMongoDBParallelSource(4, MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, new String[]{"customers"}, true);
    }

    @Test
    public void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception {
        List<String> records = this.testBackfillWhenWritingEvents(false, 22, 3, StartupOptions.snapshot());
        List<String> expectedRecords = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]");
        MongoDBAssertUtils.assertEqualsInAnyOrder(expectedRecords, records);
    }

    @Test
    public void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception {
        List<String> records = this.testBackfillWhenWritingEvents(false, 22, 2, StartupOptions.snapshot());
        List<String> expectedRecords = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]");
        MongoDBAssertUtils.assertEqualsInAnyOrder(expectedRecords, records);
    }

    @Test
    public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
        List<String> records = this.testBackfillWhenWritingEvents(false, 21, 2, StartupOptions.initial());
        List<String> expectedRecords = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]");
        MongoDBAssertUtils.assertEqualsInAnyOrder(expectedRecords, records);
    }

    @Test
    public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
        List<String> records = this.testBackfillWhenWritingEvents(false, 21, 1, StartupOptions.initial());
        List<String> expectedRecords = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]");
        MongoDBAssertUtils.assertEqualsInAnyOrder(expectedRecords, records);
    }

    @Test
    public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception {
        List<String> records = this.testBackfillWhenWritingEvents(false, 24, 3, StartupOptions.initial());
        List<String> expectedRecords = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[0, null, null, null]");
        MongoDBAssertUtils.assertEqualsInAnyOrder(expectedRecords, records);
    }

    @Test
    public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
        List<String> records = this.testBackfillWhenWritingEvents(true, 24, 2, StartupOptions.initial());
        List<String> expectedRecords = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[0, null, null, null]");
        MongoDBAssertUtils.assertEqualsInAnyOrder(expectedRecords, records);
    }

    @Test
    public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
        List<String> records = this.testBackfillWhenWritingEvents(true, 24, 1, StartupOptions.initial());
        List<String> expectedRecords = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[0, null, null, null]");
        MongoDBAssertUtils.assertEqualsInAnyOrder(expectedRecords, records);
    }

    private List<String> testBackfillWhenWritingEvents(boolean skipBackFill, int fetchSize, int hookType, StartupOptions startupOptions) throws Exception {
        String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(1000L);
        env.setParallelism(1);
        ResolvedSchema customersSchema = new ResolvedSchema(Arrays.asList(Column.physical((String)"cid", (DataType)((DataType)DataTypes.BIGINT().notNull())), Column.physical((String)"name", (DataType)DataTypes.STRING()), Column.physical((String)"address", (DataType)DataTypes.STRING()), Column.physical((String)"phone_number", (DataType)DataTypes.STRING())), new ArrayList(), UniqueConstraint.primaryKey((String)"pk", Collections.singletonList("cid")));
        TestTable customerTable = new TestTable(customerDatabase, "customers", customersSchema);
        MongoDBSource source = new MongoDBSourceBuilder().hosts(CONTAINER.getHostAndPort()).databaseList(new String[]{customerDatabase}).username("flinkuser").password("a1?~!@#$%^&*(){}[]<>.,+_-=/|:;").startupOptions(StartupOptions.initial()).scanFullChangelog(false).collectionList(new String[]{this.getCollectionNameRegex(customerDatabase, new String[]{"customers"})}).deserializer((DebeziumDeserializationSchema)customerTable.getDeserializer(false)).skipSnapshotBackfill(skipBackFill).startupOptions(startupOptions).build();
        SnapshotPhaseHooks hooks = new SnapshotPhaseHooks();
        SnapshotPhaseHook & Serializable snapshotPhaseHook = (SnapshotPhaseHook & Serializable)(sourceConfig, split) -> {
            MongoDBSourceConfig mongoDBSourceConfig = (MongoDBSourceConfig)sourceConfig;
            MongoClient mongoClient = MongoUtils.clientFor((MongoDBSourceConfig)mongoDBSourceConfig);
            MongoDatabase database = mongoClient.getDatabase((String)mongoDBSourceConfig.getDatabaseList().get(0));
            MongoCollection mongoCollection = database.getCollection("customers");
            Document document = new Document();
            document.put("cid", (Object)15213L);
            document.put("name", (Object)"user_15213");
            document.put("address", (Object)"Shanghai");
            document.put("phone_number", (Object)"123567891234");
            mongoCollection.insertOne((Object)document);
            mongoCollection.updateOne(Filters.eq((String)"cid", (Object)2000L), Updates.set((String)"address", (Object)"Pittsburgh"));
            mongoCollection.deleteOne(Filters.eq((String)"cid", (Object)1019L));
        };
        switch (hookType) {
            case 1: {
                hooks.setPostLowWatermarkAction((SnapshotPhaseHook)snapshotPhaseHook);
                break;
            }
            case 2: {
                hooks.setPreHighWatermarkAction((SnapshotPhaseHook)snapshotPhaseHook);
                break;
            }
            case 3: {
                hooks.setPostHighWatermarkAction((SnapshotPhaseHook)snapshotPhaseHook);
            }
        }
        source.setSnapshotHooks(hooks);
        ArrayList<String> records = new ArrayList();
        try (CloseableIterator iterator = env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source").executeAndCollect();){
            records = MongoDBTestUtils.fetchRowData((Iterator<RowData>)iterator, fetchSize, customerTable::stringify);
            env.close();
        }
        return records;
    }

    private void testMongoDBParallelSource(MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, String[] captureCustomerCollections) throws Exception {
        this.testMongoDBParallelSource(4, failoverType, failoverPhase, captureCustomerCollections);
    }

    private void testMongoDBParallelSource(int parallelism, MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, String[] captureCustomerCollections) throws Exception {
        this.testMongoDBParallelSource(parallelism, failoverType, failoverPhase, captureCustomerCollections, false);
    }

    private void testMongoDBParallelSource(int parallelism, MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, String[] captureCustomerCollections, boolean skipSnapshotBackfill) throws Exception {
        String customerDatabase = CONTAINER.executeCommandFileInSeparateDatabase("customer");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
        env.setParallelism(parallelism);
        env.enableCheckpointing(200L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)1, (long)0L));
        String sourceDDL = String.format("CREATE TABLE customers ( _id STRING NOT NULL, cid BIGINT NOT NULL, name STRING, address STRING, phone_number STRING, primary key (_id) not enforced) WITH ( 'connector' = 'mongodb-cdc', 'scan.incremental.snapshot.enabled' = 'true', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s', 'heartbeat.interval.ms' = '500', 'scan.incremental.snapshot.backfill.skip' = '%s')", CONTAINER.getHostAndPort(), "flinkuser", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", customerDatabase, this.getCollectionNameRegex(customerDatabase, captureCustomerCollections), skipSnapshotBackfill);
        String[] snapshotForSingleTable = new String[]{"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        tEnv.executeSql(sourceDDL);
        TableResult tableResult = tEnv.executeSql("select cid, name, address, phone_number from customers");
        CloseableIterator iterator = tableResult.collect();
        JobID jobId = ((JobClient)tableResult.getJobClient().get()).getJobID();
        ArrayList<String> expectedSnapshotData = new ArrayList<String>();
        for (int i = 0; i < captureCustomerCollections.length; ++i) {
            expectedSnapshotData.addAll(Arrays.asList(snapshotForSingleTable));
        }
        if (failoverPhase == MongoDBTestUtils.FailoverPhase.SNAPSHOT && iterator.hasNext()) {
            MongoDBTestUtils.triggerFailover(failoverType, jobId, this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(100L));
        }
        MongoDBAssertUtils.assertEqualsInAnyOrder(expectedSnapshotData, MongoDBTestUtils.fetchRows((Iterator<Row>)iterator, expectedSnapshotData.size()));
        for (String collectionName : captureCustomerCollections) {
            this.makeFirstPartChangeStreamEvents(mongodbClient.getDatabase(customerDatabase), collectionName);
        }
        if (failoverPhase == MongoDBTestUtils.FailoverPhase.STREAM) {
            MongoDBTestUtils.triggerFailover(failoverType, jobId, this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(200L));
        }
        for (String collectionName : captureCustomerCollections) {
            this.makeSecondPartChangeStreamEvents(mongodbClient.getDatabase(customerDatabase), collectionName);
        }
        String[] changeEventsForSingleTable = new String[]{"-U[101, user_1, Shanghai, 123567891234]", "+U[101, user_1, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, user_11, Hangzhou, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        ArrayList<String> expectedChangeStreamData = new ArrayList<String>();
        for (int i = 0; i < captureCustomerCollections.length; ++i) {
            expectedChangeStreamData.addAll(Arrays.asList(changeEventsForSingleTable));
        }
        List<String> actualChangeStreamData = MongoDBTestUtils.fetchRows((Iterator<Row>)iterator, expectedChangeStreamData.size());
        MongoDBAssertUtils.assertEqualsInAnyOrder(expectedChangeStreamData, actualChangeStreamData);
        ((JobClient)tableResult.getJobClient().get()).cancel().get();
    }

    private String getCollectionNameRegex(String database, String[] captureCustomerCollections) {
        Preconditions.checkState((captureCustomerCollections.length > 0 ? 1 : 0) != 0);
        if (captureCustomerCollections.length == 1) {
            return database + "." + captureCustomerCollections[0];
        }
        return Arrays.stream(captureCustomerCollections).map(coll -> "^(" + database + "." + coll + ")$").collect(Collectors.joining("|"));
    }

    private void sleepMs(long millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private void makeFirstPartChangeStreamEvents(MongoDatabase mongoDatabase, String collection) {
        MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
        mongoCollection.updateOne(Filters.eq((String)"cid", (Object)101L), Updates.set((String)"address", (Object)"Hangzhou"));
        mongoCollection.deleteOne(Filters.eq((String)"cid", (Object)102L));
        mongoCollection.insertOne((Object)this.customerDocOf(102L, "user_2", "Shanghai", "123567891234"));
        mongoCollection.updateOne(Filters.eq((String)"cid", (Object)103L), Updates.set((String)"address", (Object)"Hangzhou"));
    }

    private void makeSecondPartChangeStreamEvents(MongoDatabase mongoDatabase, String collection) {
        MongoCollection mongoCollection = mongoDatabase.getCollection(collection);
        mongoCollection.updateOne(Filters.eq((String)"cid", (Object)1010L), Updates.set((String)"address", (Object)"Hangzhou"));
        mongoCollection.insertMany(Arrays.asList(this.customerDocOf(2001L, "user_22", "Shanghai", "123567891234"), this.customerDocOf(2002L, "user_23", "Shanghai", "123567891234"), this.customerDocOf(2003L, "user_24", "Shanghai", "123567891234")));
    }

    private Document customerDocOf(Long cid, String name, String address, String phoneNumber) {
        Document document = new Document();
        document.put("cid", (Object)cid);
        document.put("name", (Object)name);
        document.put("address", (Object)address);
        document.put("phone_number", (Object)phoneNumber);
        return document;
    }
}

