/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.source.reader;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.changestream.OperationType;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.function.Predicate;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.offset.Offset;
import org.apache.flink.cdc.connectors.base.source.meta.split.ChangeEventRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.StreamSplit;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfigFactory;
import org.apache.flink.cdc.connectors.mongodb.source.dialect.MongoDBDialect;
import org.apache.flink.cdc.connectors.mongodb.source.offset.ChangeStreamDescriptor;
import org.apache.flink.cdc.connectors.mongodb.source.offset.ChangeStreamOffset;
import org.apache.flink.cdc.connectors.mongodb.source.offset.ChangeStreamOffsetFactory;
import org.apache.flink.cdc.connectors.mongodb.source.utils.CollectionDiscoveryUtils;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.bson.Document;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

public class MongoDBStreamSplitReaderTest
extends MongoDBSourceTestBase {
    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds((long)300L);
    private static final String STREAM_SPLIT_ID = "stream-split";
    private static final int MAX_RETRY_TIMES = 100;
    private String database;
    private MongoDBDialect dialect;
    private MongoDBSourceConfig sourceConfig;
    private ChangeStreamOffsetFactory changeStreamOffsetFactory;
    private ChangeStreamDescriptor changeStreamDescriptor;
    private BsonDocument startupResumeToken;

    @Before
    public void before() {
        this.database = CONTAINER.executeCommandFileInSeparateDatabase("chunk_test");
        MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory().hosts(CONTAINER.getHostAndPort()).databaseList(new String[]{this.database}).collectionList(new String[]{this.database + ".shopping_cart"}).username("flinkuser").password("a1?~!@#$%^&*(){}[]<>.,+_-=/|:;").splitSizeMB(1).pollAwaitTimeMillis(500);
        this.dialect = new MongoDBDialect();
        this.sourceConfig = configFactory.create(0);
        this.changeStreamOffsetFactory = new ChangeStreamOffsetFactory();
        List discoveredDatabases = CollectionDiscoveryUtils.databaseNames((MongoClient)mongodbClient, (Predicate)CollectionDiscoveryUtils.databaseFilter((List)this.sourceConfig.getDatabaseList()));
        List discoveredCollections = CollectionDiscoveryUtils.collectionNames((MongoClient)mongodbClient, (List)discoveredDatabases, (Predicate)CollectionDiscoveryUtils.collectionsFilter((List)this.sourceConfig.getCollectionList()));
        this.changeStreamDescriptor = MongoUtils.getChangeStreamDescriptor((MongoDBSourceConfig)this.sourceConfig, (List)discoveredDatabases, (List)discoveredCollections);
        this.startupResumeToken = MongoUtils.getLatestResumeToken((MongoClient)mongodbClient, (ChangeStreamDescriptor)this.changeStreamDescriptor);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStreamSplitReader() throws Exception {
        IncrementalSourceReaderContext incrementalSourceReaderContext = new IncrementalSourceReaderContext((SourceReaderContext)new TestingReaderContext());
        try (IncrementalSourceSplitReader streamSplitReader = new IncrementalSourceSplitReader(0, (DataSourceDialect)this.dialect, (SourceConfig)this.sourceConfig, incrementalSourceReaderContext, SnapshotPhaseHooks.empty());){
            ChangeEventRecords records;
            ChangeStreamOffset startOffset = new ChangeStreamOffset(this.startupResumeToken);
            StreamSplit streamSplit = new StreamSplit(STREAM_SPLIT_ID, (Offset)startOffset, (Offset)this.changeStreamOffsetFactory.createNoStoppingOffset(), new ArrayList(), new HashMap(), 0);
            Assert.assertTrue((boolean)streamSplitReader.canAssignNextSplit());
            streamSplitReader.handleSplitsChanges((SplitsChange)new SplitsAddition(Collections.singletonList(streamSplit)));
            MongoCollection collection = mongodbClient.getDatabase(this.database).getCollection("shopping_cart");
            long now = System.currentTimeMillis();
            List<Document> inserts = Arrays.asList(this.shoppingCartDoc(now), this.shoppingCartDoc(now + 1L), this.shoppingCartDoc(now + 2L), this.shoppingCartDoc(now + 3L));
            collection.insertMany(inserts);
            int count = 0;
            for (int retry = 0; retry < 100 && (records = (ChangeEventRecords)streamSplitReader.fetch()).nextSplit() != null; ++retry) {
                SourceRecords sourceRecords;
                while ((sourceRecords = records.nextRecordFromSplit()) != null) {
                    Iterator iterator = sourceRecords.iterator();
                    while (iterator.hasNext()) {
                        Struct value = (Struct)((SourceRecord)iterator.next()).value();
                        OperationType operationType = OperationType.fromString((String)value.getString("operationType"));
                        Assert.assertEquals((Object)OperationType.INSERT, (Object)operationType);
                        BsonDocument fullDocument = BsonDocument.parse((String)value.getString("fullDocument"));
                        long productNo = fullDocument.getInt64((Object)"product_no").longValue();
                        String productKind = fullDocument.getString((Object)"product_kind").getValue();
                        String userId = fullDocument.getString((Object)"user_id").getValue();
                        String description = fullDocument.getString((Object)"description").getValue();
                        Assert.assertEquals((Object)("KIND_" + productNo), (Object)productKind);
                        Assert.assertEquals((Object)("user_" + productNo), (Object)userId);
                        Assert.assertEquals((Object)("my shopping cart " + productNo), (Object)description);
                        if (++count < inserts.size()) continue;
                        return;
                    }
                }
                Thread.sleep(300L);
            }
            Assert.assertEquals((long)count, (long)inserts.size());
        }
    }

    private Document shoppingCartDoc(long productNo) {
        Document document = new Document();
        document.put("product_no", (Object)productNo);
        document.put("product_kind", (Object)("KIND_" + productNo));
        document.put("user_id", (Object)("user_" + productNo));
        document.put("description", (Object)("my shopping cart " + productNo));
        return document;
    }
}

