/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.source.reader;

import io.debezium.relational.TableId;
import java.util.Collections;
import java.util.LinkedList;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
import org.apache.flink.cdc.connectors.base.source.meta.split.ChangeEventRecords;
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
import org.apache.flink.cdc.connectors.base.source.meta.wartermark.WatermarkEvent;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceReaderContext;
import org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceSplitReader;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters.SampleBucketSplitStrategy;
import org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters.ShardedSplitStrategy;
import org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters.SingleSplitStrategy;
import org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters.SplitContext;
import org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters.SplitStrategy;
import org.apache.flink.cdc.connectors.mongodb.source.assigners.splitters.SplitVectorSplitStrategy;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfigFactory;
import org.apache.flink.cdc.connectors.mongodb.source.dialect.MongoDBDialect;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.testutils.source.reader.TestingReaderContext;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDBSnapshotSplitReaderTest
extends MongoDBSourceTestBase {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBSnapshotSplitReaderTest.class);
    private static final int MAX_RETRY_TIMES = 100;
    private String database;
    private MongoDBSourceConfig sourceConfig;
    private MongoDBDialect dialect;
    private SplitContext splitContext;

    @Before
    public void before() {
        this.database = CONTAINER.executeCommandFileInSeparateDatabase("chunk_test");
        MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory().hosts(CONTAINER.getHostAndPort()).databaseList(new String[]{this.database}).collectionList(new String[]{this.database + ".shopping_cart"}).username("flinkuser").password("a1?~!@#$%^&*(){}[]<>.,+_-=/|:;").splitSizeMB(1).samplesPerChunk(10).pollAwaitTimeMillis(500);
        this.sourceConfig = configFactory.create(0);
        this.dialect = new MongoDBDialect();
        this.splitContext = SplitContext.of((MongoDBSourceConfig)this.sourceConfig, (TableId)new TableId(this.database, null, "shopping_cart"));
    }

    @Test
    public void testMongoDBSnapshotSplitReaderWithShardedSplitter() throws Exception {
        this.testMongoDBSnapshotSplitReader((SplitStrategy)ShardedSplitStrategy.INSTANCE);
    }

    @Test
    public void testMongoDBSnapshotSplitReaderWithSplitVectorSplitter() throws Exception {
        this.testMongoDBSnapshotSplitReader((SplitStrategy)SplitVectorSplitStrategy.INSTANCE);
    }

    @Test
    public void testMongoDBSnapshotSplitReaderWithSamplerSplitter() throws Exception {
        this.testMongoDBSnapshotSplitReader((SplitStrategy)SampleBucketSplitStrategy.INSTANCE);
    }

    @Test
    public void testMongoDBSnapshotSplitReaderWithSingleSplitter() throws Exception {
        this.testMongoDBSnapshotSplitReader((SplitStrategy)SingleSplitStrategy.INSTANCE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testMongoDBSnapshotSplitReader(SplitStrategy splitter) throws Exception {
        LinkedList snapshotSplits = new LinkedList(splitter.split(this.splitContext));
        Assert.assertTrue((snapshotSplits.size() > 0 ? 1 : 0) != 0);
        IncrementalSourceReaderContext incrementalSourceReaderContext = new IncrementalSourceReaderContext((SourceReaderContext)new TestingReaderContext());
        long actualCount = 0L;
        try (IncrementalSourceSplitReader snapshotSplitReader = new IncrementalSourceSplitReader(0, (DataSourceDialect)this.dialect, (SourceConfig)this.sourceConfig, incrementalSourceReaderContext, SnapshotPhaseHooks.empty());){
            for (int retry = 0; retry < 100; ++retry) {
                ChangeEventRecords records;
                if (!snapshotSplits.isEmpty() && snapshotSplitReader.canAssignNextSplit()) {
                    SnapshotSplit snapshotSplit = (SnapshotSplit)snapshotSplits.poll();
                    LOG.info("Add snapshot split {}", (Object)snapshotSplit.splitId());
                    snapshotSplitReader.handleSplitsChanges((SplitsChange)new SplitsAddition(Collections.singletonList(snapshotSplit)));
                }
                if ((records = (ChangeEventRecords)snapshotSplitReader.fetch()).nextSplit() != null) {
                    SourceRecords sourceRecords;
                    while ((sourceRecords = records.nextRecordFromSplit()) != null) {
                        for (SourceRecord record : sourceRecords) {
                            if (WatermarkEvent.isWatermarkEvent((SourceRecord)record)) continue;
                            Struct value = (Struct)record.value();
                            BsonDocument fullDocument = BsonDocument.parse((String)value.getString("fullDocument"));
                            long productNo = fullDocument.getInt64((Object)"product_no").longValue();
                            String productKind = fullDocument.getString((Object)"product_kind").getValue();
                            String userId = fullDocument.getString((Object)"user_id").getValue();
                            String description = fullDocument.getString((Object)"description").getValue();
                            Assert.assertEquals((Object)("KIND_" + productNo), (Object)productKind);
                            Assert.assertEquals((Object)("user_" + productNo), (Object)userId);
                            Assert.assertEquals((Object)("my shopping cart " + productNo), (Object)description);
                            ++actualCount;
                        }
                    }
                } else if (snapshotSplits.isEmpty() && snapshotSplitReader.canAssignNextSplit()) {
                    break;
                }
                Thread.sleep(300L);
            }
        }
        Assert.assertEquals((long)this.splitContext.getDocumentCount(), (long)actualCount);
    }
}

