/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.source;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.bson.Document;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

public class NewlyAddedTableITCase
extends MongoDBSourceTestBase {
    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds((long)500L);
    private String customerDatabase;
    protected static final int DEFAULT_PARALLELISM = 4;
    private final ScheduledExecutorService mockChangelogExecutor = Executors.newScheduledThreadPool(1);

    @Before
    public void before() throws SQLException {
        this.customerDatabase = "customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);
        TestValuesTableFactory.clearAllData();
        String collectionName = "produce_changelog";
        CONTAINER.executeCommand("use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
        MongoDatabase mongoDatabase = mongodbClient.getDatabase(this.customerDatabase);
        MongoCollection mongoCollection = mongoDatabase.getCollection(collectionName);
        this.mockChangelogExecutor.schedule(() -> {
            Document document = new Document();
            document.put("cid", (Object)1);
            document.put("cnt", (Object)103L);
            mongoCollection.insertOne((Object)document);
            mongoCollection.deleteOne(Filters.eq((String)"cid", (Object)1));
        }, 500L, TimeUnit.MICROSECONDS);
    }

    @After
    public void after() {
        this.mockChangelogExecutor.shutdown();
        MongoDatabase mongoDatabase = mongodbClient.getDatabase(this.customerDatabase);
        mongoDatabase.drop();
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineOnce() throws Exception {
        this.testNewlyAddedCollectionOneByOne(1, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineOnceWithAheadOplog() throws Exception {
        this.testNewlyAddedCollectionOneByOne(1, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineTwice() throws Exception {
        this.testNewlyAddedCollectionOneByOne(4, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineTwiceWithAheadOplog() throws Exception {
        this.testNewlyAddedCollectionOneByOne(4, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineTwiceWithAheadOplogAndAutoCloseReader() throws Exception {
        HashMap<String, String> otherOptions = new HashMap<String, String>();
        otherOptions.put("scan.incremental.close-idle-reader.enabled", "true");
        this.testNewlyAddedCollectionOneByOne(4, otherOptions, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineThrice() throws Exception {
        this.testNewlyAddedCollectionOneByOne(4, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing", "address_shanghai", "address_shenzhen");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineThriceWithAheadOplog() throws Exception {
        this.testNewlyAddedCollectionOneByOne(4, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing", "address_shanghai", "address_shenzhen");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineSingleParallelism() throws Exception {
        this.testNewlyAddedCollectionOneByOne(1, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineSingleParallelismWithAheadOplog() throws Exception {
        this.testNewlyAddedCollectionOneByOne(1, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForNewlyAddedCollection() throws Exception {
        this.testNewlyAddedCollectionOneByOne(4, MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForNewlyAddedCollectionWithAheadOplog() throws Exception {
        this.testNewlyAddedCollectionOneByOne(4, MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testTaskManagerFailoverForNewlyAddedCollection() throws Exception {
        this.testNewlyAddedCollectionOneByOne(1, MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.STREAM, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testTaskManagerFailoverForNewlyAddedCollectionWithAheadOplog() throws Exception {
        this.testNewlyAddedCollectionOneByOne(1, MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.STREAM, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForRemoveCollectionSingleParallelism() throws Exception {
        this.testRemoveCollectionsOneByOne(1, MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testJobManagerFailoverForRemoveCollection() throws Exception {
        this.testRemoveCollectionsOneByOne(4, MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testTaskManagerFailoverForRemoveCollectionSingleParallelism() throws Exception {
        this.testRemoveCollectionsOneByOne(1, MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testTaskManagerFailoverForRemoveCollection() throws Exception {
        this.testRemoveCollectionsOneByOne(4, MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testRemoveCollectionSingleParallelism() throws Exception {
        this.testRemoveCollectionsOneByOne(1, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testRemoveCollection() throws Exception {
        this.testRemoveCollectionsOneByOne(4, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testRemoveAndAddCollectionsOneByOne() throws Exception {
        this.testRemoveAndAddCollectionsOneByOne(1, "address_hangzhou", "address_beijing", "address_shanghai", "address_shenzhen");
    }

    private void testRemoveAndAddCollectionsOneByOne(int parallelism, String ... captureAddressCollections) throws Exception {
        MongoDatabase database = mongodbClient.getDatabase(this.customerDatabase);
        this.initialAddressCollections(database, captureAddressCollections);
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
        ArrayList<String> fetchedDataList = new ArrayList<String>();
        String finishedSavePointPath = null;
        StreamExecutionEnvironment env = this.getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
        String collection0 = captureAddressCollections[0];
        String cityName0 = collection0.split("_")[1];
        String createTableStatement = this.getCreateTableStatement(new HashMap<String, String>(), collection0);
        tEnv.executeSql(createTableStatement);
        tEnv.executeSql("CREATE TABLE sink ( collection_name STRING, cid BIGINT, country STRING, city STRING, detail_address STRING, primary key (collection_name,cid) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        TableResult tableResult = tEnv.executeSql("insert into sink select collection_name, cid, country, city, detail_address from address");
        JobClient jobClient = (JobClient)tableResult.getJobClient().get();
        fetchedDataList.addAll(Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", collection0, cityName0, cityName0), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", collection0, cityName0, cityName0), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", collection0, cityName0, cityName0)));
        MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
        MongoDBAssertUtils.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults((String)"sink"));
        this.makeOplogForAddressTableInRound(database, collection0, 0);
        fetchedDataList.addAll(Arrays.asList(String.format("-U[%s, 416874195632735147, China, %s, %s West Town address 1]", collection0, cityName0, cityName0), String.format("+U[%s, 416874195632735147, China_0, %s, %s West Town address 1]", collection0, cityName0, cityName0), String.format("+I[%s, %d, China, %s, %s West Town address 4]", collection0, 417022095255614380L, cityName0, cityName0)));
        MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
        MongoDBAssertUtils.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults((String)"sink"));
        finishedSavePointPath = this.triggerSavepointWithRetry(jobClient, savepointDirectory);
        jobClient.cancel().get();
        for (int round = 1; round < captureAddressCollections.length; ++round) {
            String captureTableThisRound = captureAddressCollections[round];
            String cityName = captureTableThisRound.split("_")[1];
            StreamExecutionEnvironment env2 = this.getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
            StreamTableEnvironment tEnv2 = StreamTableEnvironment.create((StreamExecutionEnvironment)env2);
            String createTableStatement2 = this.getCreateTableStatement(new HashMap<String, String>(), captureAddressCollections[0], captureTableThisRound);
            tEnv2.executeSql(createTableStatement2);
            tEnv2.executeSql("CREATE TABLE sink ( collection_name STRING, cid BIGINT, country STRING, city STRING, detail_address STRING, primary key (collection_name,cid) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
            TableResult tableResult2 = tEnv2.executeSql("insert into sink select collection_name, cid, country, city, detail_address from address");
            JobClient jobClient2 = (JobClient)tableResult2.getJobClient().get();
            fetchedDataList.addAll(Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", captureTableThisRound, cityName, cityName), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", captureTableThisRound, cityName, cityName), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", captureTableThisRound, cityName, cityName)));
            MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
            MongoDBAssertUtils.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults((String)"sink"));
            for (int i = 0; i <= round; ++i) {
                String collection = captureAddressCollections[i];
                this.makeOplogForAddressTableInRound(database, collection, round);
            }
            String collection02 = captureAddressCollections[0];
            String cityName02 = collection02.split("_")[1];
            fetchedDataList.addAll(Arrays.asList(String.format("-U[%s, 416874195632735147, China_%s, %s, %s West Town address 1]", collection02, round - 1, cityName02, cityName02), String.format("+U[%s, 416874195632735147, China_%s, %s, %s West Town address 1]", collection02, round, cityName02, cityName02), String.format("+I[%s, %d, China, %s, %s West Town address 4]", collection02, 417022095255614380L + (long)round, cityName02, cityName02)));
            fetchedDataList.addAll(Arrays.asList(String.format("-U[%s, 416874195632735147, China, %s, %s West Town address 1]", captureTableThisRound, cityName, cityName), String.format("+U[%s, 416874195632735147, China_%s, %s, %s West Town address 1]", captureTableThisRound, round, cityName, cityName), String.format("+I[%s, %d, China, %s, %s West Town address 4]", captureTableThisRound, 417022095255614380L + (long)round, cityName, cityName)));
            MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
            MongoDBAssertUtils.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults((String)"sink"));
            if (round != captureAddressCollections.length - 1) {
                finishedSavePointPath = this.triggerSavepointWithRetry(jobClient2, savepointDirectory);
            }
            jobClient2.cancel().get();
        }
    }

    private void testRemoveCollectionsOneByOne(int parallelism, MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, String ... captureAddressCollections) throws Exception {
        this.initialAddressCollections(mongodbClient.getDatabase(this.customerDatabase), captureAddressCollections);
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
        ArrayList<String> fetchedDataList = new ArrayList<String>();
        for (String collection : captureAddressCollections) {
            String cityName = collection.split("_")[1];
            fetchedDataList.addAll(Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", collection, cityName, cityName), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", collection, cityName, cityName), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", collection, cityName, cityName)));
        }
        String finishedSavePointPath = null;
        StreamExecutionEnvironment env = this.getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
        String createTableStatement = this.getCreateTableStatement(new HashMap<String, String>(), captureAddressCollections);
        tEnv.executeSql(createTableStatement);
        tEnv.executeSql("CREATE TABLE sink ( collection_name STRING, cid BIGINT, country STRING, city STRING, detail_address STRING, primary key (collection_name,cid) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        TableResult tableResult = tEnv.executeSql("insert into sink select collection_name, cid, country, city, detail_address from address");
        JobClient jobClient = (JobClient)tableResult.getJobClient().get();
        if (failoverPhase == MongoDBTestUtils.FailoverPhase.SNAPSHOT) {
            MongoDBTestUtils.triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(100L));
        }
        MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
        MongoDBAssertUtils.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults((String)"sink"));
        finishedSavePointPath = this.triggerSavepointWithRetry(jobClient, savepointDirectory);
        jobClient.cancel().get();
        for (int round = 0; round < captureAddressCollections.length - 1; ++round) {
            String[] captureTablesThisRound = Arrays.asList(captureAddressCollections).subList(round + 1, captureAddressCollections.length).toArray(new String[0]);
            StreamExecutionEnvironment env2 = this.getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
            StreamTableEnvironment tEnv2 = StreamTableEnvironment.create((StreamExecutionEnvironment)env2);
            String createTableStatement2 = this.getCreateTableStatement(new HashMap<String, String>(), captureTablesThisRound);
            tEnv2.executeSql(createTableStatement2);
            tEnv2.executeSql("CREATE TABLE sink ( collection_name STRING, cid BIGINT, country STRING, city STRING, detail_address STRING, primary key (collection_name,cid) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
            TableResult tableResult2 = tEnv2.executeSql("insert into sink select collection_name, cid, country, city, detail_address from address");
            JobClient jobClient2 = (JobClient)tableResult2.getJobClient().get();
            MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
            MongoDBAssertUtils.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults((String)"sink"));
            ArrayList<String> expectedOplogDataThisRound = new ArrayList<String>();
            int captureAddressTablesLength = captureAddressCollections.length;
            for (int i = 0; i < captureAddressTablesLength; ++i) {
                String collectionName = captureAddressCollections[i];
                this.makeOplogForAddressTableInRound(mongodbClient.getDatabase(this.customerDatabase), collectionName, round);
                if (i <= round) continue;
                String cityName = collectionName.split("_")[1];
                expectedOplogDataThisRound.addAll(Arrays.asList(String.format("-U[%s, 416874195632735147, China%s, %s, %s West Town address 1]", collectionName, round == 0 ? "" : "_" + (round - 1), cityName, cityName), String.format("+U[%s, 416874195632735147, China_%s, %s, %s West Town address 1]", collectionName, round, cityName, cityName), String.format("+I[%s, %d, China, %s, %s West Town address 4]", collectionName, 417022095255614380L + (long)round, cityName, cityName)));
            }
            if (failoverPhase == MongoDBTestUtils.FailoverPhase.STREAM && TestValuesTableFactory.getRawResults((String)"sink").size() > fetchedDataList.size()) {
                MongoDBTestUtils.triggerFailover(failoverType, jobClient2.getJobID(), this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(100L));
            }
            fetchedDataList.addAll(expectedOplogDataThisRound);
            MongoDBTestUtils.waitForSinkSize("sink", fetchedDataList.size());
            MongoDBAssertUtils.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getRawResults((String)"sink"));
            finishedSavePointPath = this.triggerSavepointWithRetry(jobClient2, savepointDirectory);
            jobClient2.cancel().get();
        }
    }

    private void testNewlyAddedCollectionOneByOne(int parallelism, MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, boolean makeOplogBeforeCapture, String ... captureAddressCollections) throws Exception {
        this.testNewlyAddedCollectionOneByOne(parallelism, new HashMap<String, String>(), failoverType, failoverPhase, makeOplogBeforeCapture, captureAddressCollections);
    }

    private void testNewlyAddedCollectionOneByOne(int parallelism, Map<String, String> sourceOptions, MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, boolean makeOplogBeforeCapture, String ... captureAddressCollections) throws Exception {
        this.initialAddressCollections(mongodbClient.getDatabase(this.customerDatabase), captureAddressCollections);
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String savepointDirectory = temporaryFolder.newFolder().toURI().toString();
        String finishedSavePointPath = null;
        List<String> fetchedDataList = new ArrayList<String>();
        for (int round = 0; round < captureAddressCollections.length; ++round) {
            String[] captureCollectionsThisRound = Arrays.asList(captureAddressCollections).subList(0, round + 1).toArray(new String[0]);
            String newlyAddedCollection = captureAddressCollections[round];
            if (makeOplogBeforeCapture) {
                this.makeOplogBeforeCaptureForAddressCollection(mongodbClient.getDatabase(this.customerDatabase), newlyAddedCollection);
            }
            StreamExecutionEnvironment env = this.getStreamExecutionEnvironmentFromSavePoint(finishedSavePointPath, parallelism);
            StreamTableEnvironment tEnv = StreamTableEnvironment.create((StreamExecutionEnvironment)env);
            String createTableStatement = this.getCreateTableStatement(sourceOptions, captureCollectionsThisRound);
            tEnv.executeSql(createTableStatement);
            tEnv.executeSql("CREATE TABLE sink ( collection_name STRING, cid BIGINT, country STRING, city STRING, detail_address STRING, primary key (collection_name,cid) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
            TableResult tableResult = tEnv.executeSql("insert into sink select collection_name, cid, country, city, detail_address from address");
            JobClient jobClient = (JobClient)tableResult.getJobClient().get();
            String cityName = newlyAddedCollection.split("_")[1];
            List<String> expectedSnapshotDataThisRound = Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", newlyAddedCollection, cityName, cityName), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", newlyAddedCollection, cityName, cityName), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", newlyAddedCollection, cityName, cityName));
            if (makeOplogBeforeCapture) {
                expectedSnapshotDataThisRound = Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", newlyAddedCollection, cityName, cityName), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", newlyAddedCollection, cityName, cityName), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", newlyAddedCollection, cityName, cityName), String.format("+I[%s, 417022095255614381, China, %s, %s West Town address 5]", newlyAddedCollection, cityName, cityName));
            }
            if (failoverPhase == MongoDBTestUtils.FailoverPhase.SNAPSHOT) {
                MongoDBTestUtils.triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(100L));
            }
            fetchedDataList.addAll(expectedSnapshotDataThisRound);
            NewlyAddedTableITCase.waitForUpsertSinkSize("sink", fetchedDataList.size());
            MongoDBAssertUtils.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults((String)"sink"));
            this.makeFirstPartOplogForAddressCollection(mongodbClient.getDatabase(this.customerDatabase), newlyAddedCollection);
            if (failoverPhase == MongoDBTestUtils.FailoverPhase.STREAM) {
                MongoDBTestUtils.triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> this.sleepMs(100L));
            }
            this.makeSecondPartOplogForAddressCollections(mongodbClient.getDatabase(this.customerDatabase), newlyAddedCollection);
            fetchedDataList = fetchedDataList.stream().filter(r -> !r.contains(String.format("%s, 416874195632735147", newlyAddedCollection))).collect(Collectors.toList());
            List<String> expectedOplogUpsertDataThisRound = Arrays.asList(String.format("+I[%s, 416874195632735147, CHINA, %s, %s West Town address 1]", newlyAddedCollection, cityName, cityName), String.format("+I[%s, 417022095255614380, China, %s, %s West Town address 4]", newlyAddedCollection, cityName, cityName));
            fetchedDataList.addAll(expectedOplogUpsertDataThisRound);
            NewlyAddedTableITCase.waitForUpsertSinkSize("sink", fetchedDataList.size());
            Thread.sleep(1000L);
            MongoDBAssertUtils.assertEqualsInAnyOrder(fetchedDataList, TestValuesTableFactory.getResults((String)"sink"));
            if (round != captureAddressCollections.length - 1) {
                finishedSavePointPath = this.triggerSavepointWithRetry(jobClient, savepointDirectory);
            }
            jobClient.cancel().get();
        }
    }

    private void initialAddressCollections(MongoDatabase mongoDatabase, String[] captureCustomerCollections) {
        for (String collectionName : captureCustomerCollections) {
            String cityName = collectionName.split("_")[1];
            CONTAINER.executeCommandInDatabase(String.format("db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })", collectionName, collectionName), mongoDatabase.getName());
            MongoCollection collection = mongoDatabase.getCollection(collectionName);
            collection.insertOne((Object)this.addressDocOf(416874195632735147L, "China", cityName, cityName + " West Town address 1"));
            collection.insertOne((Object)this.addressDocOf(416927583791428523L, "China", cityName, cityName + " West Town address 2"));
            collection.insertOne((Object)this.addressDocOf(417022095255614379L, "China", cityName, cityName + " West Town address 3"));
        }
    }

    private void makeFirstPartOplogForAddressCollection(MongoDatabase mongoDatabase, String collectionName) {
        MongoCollection collection = mongoDatabase.getCollection(collectionName);
        collection.updateOne(Filters.eq((String)"cid", (Object)416874195632735147L), Updates.set((String)"country", (Object)"CHINA"));
    }

    private void makeSecondPartOplogForAddressCollections(MongoDatabase mongoDatabase, String collectionName) {
        String cityName = collectionName.split("_")[1];
        MongoCollection collection = mongoDatabase.getCollection(collectionName);
        collection.insertOne((Object)this.addressDocOf(417022095255614380L, "China", cityName, cityName + " West Town address 4"));
    }

    private void makeOplogBeforeCaptureForAddressCollection(MongoDatabase mongoDatabase, String collectionName) {
        String cityName = collectionName.split("_")[1];
        MongoCollection collection = mongoDatabase.getCollection(collectionName);
        collection.insertOne((Object)this.addressDocOf(417022095255614381L, "China", cityName, cityName + " West Town address 5"));
    }

    private void makeOplogForAddressTableInRound(MongoDatabase mongoDatabase, String collectionName, int round) {
        MongoCollection collection = mongoDatabase.getCollection(collectionName);
        String cityName = collectionName.split("_")[1];
        collection.updateOne(Filters.eq((String)"cid", (Object)416874195632735147L), Updates.set((String)"country", (Object)("China_" + round)));
        collection.insertOne((Object)this.addressDocOf(417022095255614380L + (long)round, "China", cityName, cityName + " West Town address 4"));
    }

    private void sleepMs(long millis) {
        try {
            Thread.sleep(millis);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private String triggerSavepointWithRetry(JobClient jobClient, String savepointDirectory) throws ExecutionException, InterruptedException {
        for (int retryTimes = 0; retryTimes < 600; ++retryTimes) {
            try {
                return (String)jobClient.triggerSavepoint(savepointDirectory).get();
            }
            catch (Exception e) {
                Optional exception = ExceptionUtils.findThrowable((Throwable)e, CheckpointException.class);
                if (exception.isPresent() && ((CheckpointException)exception.get()).getMessage().contains("Checkpoint triggering task")) {
                    Thread.sleep(100L);
                    continue;
                }
                throw e;
            }
        }
        return null;
    }

    private StreamExecutionEnvironment getStreamExecutionEnvironmentFromSavePoint(String finishedSavePointPath, int parallelism) throws Exception {
        Configuration tableConfig = new Configuration();
        tableConfig.setString("table.exec.sink.upsert-materialize", "none");
        if (finishedSavePointPath != null) {
            tableConfig.setString(SavepointConfigOptions.SAVEPOINT_PATH, finishedSavePointPath);
        }
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment((Configuration)tableConfig);
        env.setParallelism(parallelism);
        env.enableCheckpointing(200L);
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart((int)3, (long)100L));
        return env;
    }

    private String getCreateTableStatement(Map<String, String> otherOptions, String ... captureTableNames) {
        return String.format("CREATE TABLE address ( _id STRING NOT NULL, collection_name STRING METADATA VIRTUAL, cid BIGINT NOT NULL, country STRING, city STRING, detail_address STRING, primary key (_id) not enforced) WITH ( 'connector' = 'mongodb-cdc', 'scan.incremental.snapshot.enabled' = 'true', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s', 'chunk-meta.group.size' = '2', 'heartbeat.interval.ms' = '100', 'scan.full-changelog' = 'true', 'scan.newly-added-table.enabled' = 'true' %s)", CONTAINER.getHostAndPort(), "flinkuser", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", this.customerDatabase, this.getCollectionNameRegex(this.customerDatabase, captureTableNames), otherOptions.isEmpty() ? "" : "," + otherOptions.entrySet().stream().map(e -> String.format("'%s'='%s'", e.getKey(), e.getValue())).collect(Collectors.joining(",")));
    }

    protected static void waitForUpsertSinkSize(String sinkName, int expectedSize) throws InterruptedException {
        while (NewlyAddedTableITCase.upsertSinkSize(sinkName) < expectedSize) {
            Thread.sleep(100L);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected static int upsertSinkSize(String sinkName) {
        Class<TestValuesTableFactory> clazz = TestValuesTableFactory.class;
        synchronized (TestValuesTableFactory.class) {
            try {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return TestValuesTableFactory.getResults((String)sinkName).size();
            }
            catch (IllegalArgumentException e) {
                // ** MonitorExit[var1_1] (shouldn't be in output)
                return 0;
            }
        }
    }

    private String getCollectionNameRegex(String database, String[] captureCustomerCollections) {
        Preconditions.checkState((captureCustomerCollections.length > 0 ? 1 : 0) != 0);
        if (captureCustomerCollections.length == 1) {
            return captureCustomerCollections[0];
        }
        return Arrays.stream(captureCustomerCollections).map(coll -> "^(" + database + "." + coll + ")$").collect(Collectors.joining("|"));
    }

    private Document addressDocOf(Long cid, String country, String city, String detailAddress) {
        Document document = new Document();
        document.put("cid", (Object)cid);
        document.put("country", (Object)country);
        document.put("city", (Object)city);
        document.put("detail_address", (Object)detailAddress);
        return document;
    }
}

