/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb;

import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import com.mongodb.ConnectionString;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.connectors.mongodb.LegacyMongoDBTestBase;
import org.apache.flink.cdc.connectors.mongodb.MongoDBSource;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils;
import org.apache.flink.cdc.connectors.utils.TestSourceContext;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.Assert;
import org.junit.Test;

public class LegacyMongoDBSourceTest
extends LegacyMongoDBTestBase {
    @Test
    public void testConsumingAllEvents() throws Exception {
        String database = MONGODB_CONTAINER.executeCommandFileInSeparateDatabase("inventory");
        final DebeziumSourceFunction<SourceRecord> source = this.createMongoDBSource(database);
        final TestSourceContext sourceContext = new TestSourceContext();
        MongoCollection products = LegacyMongoDBSourceTest.getMongoDatabase(database).getCollection("products");
        LegacyMongoDBSourceTest.setupSource(source);
        CheckedThread runThread = new CheckedThread(){

            public void go() throws Exception {
                source.run((SourceFunction.SourceContext)sourceContext);
            }
        };
        runThread.start();
        List records = this.drain(sourceContext, 9);
        Assert.assertEquals((long)9L, (long)records.size());
        Assert.assertTrue((boolean)this.waitForCheckpointLock(sourceContext.getCheckpointLock(), Duration.ofSeconds(15L)));
        products.insertOne((Object)this.productDocOf(null, "description", "Toy robot", 1.304));
        records = this.drain(sourceContext, 1);
        MongoDBAssertUtils.assertInsert((SourceRecord)records.get(0), true);
        products.insertOne((Object)this.productDocOf("000000000000000000001001", "roy", "old robot", 1234.56));
        records = this.drain(sourceContext, 1);
        MongoDBAssertUtils.assertInsert((SourceRecord)records.get(0), true);
        MongoDBAssertUtils.assertObjectIdEquals("000000000000000000001001", (SourceRecord)records.get(0));
        products.updateOne(Filters.eq((String)"_id", (Object)new ObjectId("000000000000000000001001")), Updates.set((String)"description", (Object)"really old robot"));
        records = this.drain(sourceContext, 1);
        MongoDBAssertUtils.assertUpdate((SourceRecord)records.get(0), "000000000000000000001001");
        products.replaceOne(Filters.eq((String)"_id", (Object)new ObjectId("000000000000000000001001")), (Object)this.productDocOf("000000000000000000001001", "roy_replace", "old robot replace", 1234.57));
        records = this.drain(sourceContext, 1);
        MongoDBAssertUtils.assertReplace((SourceRecord)records.get(0), "000000000000000000001001");
        products.deleteOne(Filters.eq((String)"_id", (Object)new ObjectId("000000000000000000001001")));
        records = this.drain(sourceContext, 1);
        MongoDBAssertUtils.assertDelete((SourceRecord)records.get(0), "000000000000000000001001");
        source.close();
        runThread.sync();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCheckpointAndRestore() throws Exception {
        String database = MONGODB_CONTAINER.executeCommandFileInSeparateDatabase("inventory");
        TestingListState offsetState = new TestingListState();
        TestingListState historyState = new TestingListState();
        final DebeziumSourceFunction<SourceRecord> source = this.createMongoDBSource(database);
        final BlockingSourceContext sourceContext = new BlockingSourceContext(8);
        LegacyMongoDBSourceTest.setupSource(source, false, offsetState, historyState, true, 0, 1);
        CheckedThread runThread = new CheckedThread(){

            public void go() throws Exception {
                source.run((SourceFunction.SourceContext)sourceContext);
            }
        };
        runThread.start();
        List records = this.drain(sourceContext, 2);
        int received = records.size();
        Assert.assertEquals((long)2L, (long)received);
        Assert.assertFalse((boolean)this.waitForCheckpointLock(sourceContext.getCheckpointLock(), Duration.ofSeconds(3L)));
        sourceContext.blocker.release();
        records = this.drain(sourceContext, 9 - received);
        Assert.assertEquals((long)9L, (long)(records.size() + received));
        Assert.assertEquals((long)0L, (long)offsetState.list.size());
        Assert.assertEquals((long)0L, (long)historyState.list.size());
        Object object = sourceContext.getCheckpointLock();
        synchronized (object) {
            source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(101L, 101L));
        }
        Assert.assertEquals((long)1L, (long)offsetState.list.size());
        Object state = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
        Assert.assertTrue((boolean)((String)state).contains("sourcePartition"));
        Assert.assertTrue((boolean)((String)state).contains("sourceOffset"));
        MongoCollection products = LegacyMongoDBSourceTest.getMongoDatabase(database).getCollection("products");
        products.updateOne(Filters.eq((String)"_id", (Object)new ObjectId("100000000000000000000102")), Updates.set((String)"description", (Object)"really old robot 1002"));
        records = this.drain(sourceContext, 1);
        MongoDBAssertUtils.assertUpdate((SourceRecord)records.get(0), "100000000000000000000102");
        Object object2 = sourceContext.getCheckpointLock();
        synchronized (object2) {
            source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(102L, 102L));
        }
        Assert.assertEquals((long)1L, (long)offsetState.list.size());
        state = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
        Assert.assertTrue((boolean)((String)state).contains("sourcePartition"));
        Assert.assertTrue((boolean)((String)state).contains("sourceOffset"));
        String resumeToken = (String)JsonPath.read((String)state, (String)"$.sourceOffset._id", (Predicate[])new Predicate[0]);
        Assert.assertTrue((boolean)resumeToken.contains("_data"));
        source.close();
        runThread.sync();
        final DebeziumSourceFunction<SourceRecord> source2 = this.createMongoDBSource(database);
        final TestSourceContext sourceContext2 = new TestSourceContext();
        LegacyMongoDBSourceTest.setupSource(source2, true, offsetState, historyState, true, 0, 1);
        CheckedThread runThread2 = new CheckedThread(){

            public void go() throws Exception {
                source2.run((SourceFunction.SourceContext)sourceContext2);
            }
        };
        runThread2.start();
        Assert.assertFalse((boolean)this.waitForAvailableRecords(Duration.ofSeconds(5L), sourceContext2));
        MongoCollection products2 = LegacyMongoDBSourceTest.getMongoDatabase(database).getCollection("products");
        products2.insertOne((Object)this.productDocOf("000000000000000000001002", "robot", "Toy robot", 1.304));
        List records2 = this.drain(sourceContext2, 1);
        Assert.assertEquals((long)1L, (long)records2.size());
        MongoDBAssertUtils.assertInsert((SourceRecord)records2.get(0), true);
        MongoDBAssertUtils.assertObjectIdEquals("000000000000000000001002", (SourceRecord)records2.get(0));
        state = sourceContext2.getCheckpointLock();
        synchronized (state) {
            source2.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(138L, 138L));
        }
        Assert.assertEquals((long)1L, (long)offsetState.list.size());
        state = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
        Assert.assertTrue((boolean)((String)state).contains("sourcePartition"));
        Assert.assertTrue((boolean)((String)state).contains("sourceOffset"));
        String resumeToken2 = (String)JsonPath.read((String)state, (String)"$.sourceOffset._id", (Predicate[])new Predicate[0]);
        Assert.assertTrue((boolean)resumeToken2.contains("_data"));
        products2.insertOne((Object)this.productDocOf("000000000000000000001003", "roy", "old robot", 1234.56));
        products2.updateOne(Filters.eq((String)"_id", (Object)new ObjectId("000000000000000000001002")), Updates.set((String)"weight", (Object)1345.67));
        source2.close();
        runThread2.sync();
        final DebeziumSourceFunction<SourceRecord> source3 = this.createMongoDBSource(database);
        final TestSourceContext sourceContext3 = new TestSourceContext();
        LegacyMongoDBSourceTest.setupSource(source3, true, offsetState, historyState, true, 0, 1);
        CheckedThread runThread3 = new CheckedThread(){

            public void go() throws Exception {
                source3.run((SourceFunction.SourceContext)sourceContext3);
            }
        };
        runThread3.start();
        records = this.drain(sourceContext3, 2);
        MongoDBAssertUtils.assertInsert((SourceRecord)records.get(0), true);
        MongoDBAssertUtils.assertObjectIdEquals("000000000000000000001003", (SourceRecord)records.get(0));
        MongoDBAssertUtils.assertUpdate((SourceRecord)records.get(1), "000000000000000000001002");
        Assert.assertFalse((boolean)this.waitForAvailableRecords(Duration.ofSeconds(3L), sourceContext3));
        MongoCollection products3 = LegacyMongoDBSourceTest.getMongoDatabase(database).getCollection("products");
        products3.deleteOne(Filters.eq((String)"_id", (Object)new ObjectId("000000000000000000001002")));
        records = this.drain(sourceContext3, 1);
        MongoDBAssertUtils.assertDelete((SourceRecord)records.get(0), "000000000000000000001002");
        products3.deleteOne(Filters.eq((String)"_id", (Object)new ObjectId("000000000000000000001003")));
        records = this.drain(sourceContext3, 1);
        MongoDBAssertUtils.assertDelete((SourceRecord)records.get(0), "000000000000000000001003");
        state = sourceContext3.getCheckpointLock();
        synchronized (state) {
            source3.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(233L, 233L));
        }
        Assert.assertEquals((long)1L, (long)offsetState.list.size());
        state = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
        Assert.assertTrue((boolean)((String)state).contains("sourcePartition"));
        Assert.assertTrue((boolean)((String)state).contains("sourceOffset"));
        resumeToken2 = (String)JsonPath.read((String)state, (String)"$.sourceOffset._id", (Predicate[])new Predicate[0]);
        Assert.assertTrue((boolean)resumeToken2.contains("_data"));
        source3.close();
        runThread3.sync();
    }

    @Test
    public void testConnectionUri() {
        String hosts = MONGODB_CONTAINER.getHostAndPort();
        String case0 = MongoUtils.buildConnectionString(null, null, (String)"mongodb", (String)hosts, null);
        Assert.assertEquals((Object)String.format("mongodb://%s", hosts), (Object)case0);
        String case1 = MongoUtils.buildConnectionString((String)"", null, (String)"mongodb", (String)hosts, null);
        Assert.assertEquals((Object)String.format("mongodb://%s", hosts), (Object)case1);
        String case2 = MongoUtils.buildConnectionString(null, (String)"", (String)"mongodb+srv", (String)"localhost", null);
        Assert.assertEquals((Object)"mongodb+srv://localhost", (Object)case2);
        ConnectionString case3 = new ConnectionString(MongoUtils.buildConnectionString((String)"flinkuser", (String)"a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", (String)"mongodb", (String)hosts, null));
        Assert.assertEquals((Object)"flinkuser", (Object)case3.getUsername());
        Assert.assertEquals((Object)"a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", (Object)new String(case3.getPassword()));
    }

    private DebeziumSourceFunction<SourceRecord> createMongoDBSource(String database) {
        return MongoDBSource.builder().hosts(MONGODB_CONTAINER.getHostAndPort()).username("flinkuser").password("a1?~!@#$%^&*(){}[]<>.,+_-=/|:;").databaseList(new String[]{database}).collectionList(new String[]{database + ".products"}).deserializer((DebeziumDeserializationSchema)new ForwardDeserializeSchema()).build();
    }

    private <T> List<T> drain(TestSourceContext<T> sourceContext, int expectedRecordCount) throws Exception {
        ArrayList<Object> allRecords = new ArrayList<Object>();
        LinkedBlockingQueue queue = sourceContext.getCollectedOutputs();
        while (allRecords.size() < expectedRecordCount) {
            StreamRecord record = (StreamRecord)queue.poll(100L, TimeUnit.SECONDS);
            if (record != null) {
                allRecords.add(record.getValue());
                continue;
            }
            throw new RuntimeException("Can't receive " + expectedRecordCount + " elements before timeout.");
        }
        return allRecords;
    }

    private boolean waitForCheckpointLock(Object checkpointLock, Duration timeout) throws Exception {
        Semaphore semaphore = new Semaphore(0);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(() -> {
            Object object = checkpointLock;
            synchronized (object) {
                semaphore.release();
            }
        });
        boolean result = semaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS);
        executor.shutdownNow();
        return result;
    }

    private boolean waitForAvailableRecords(Duration timeout, TestSourceContext<?> sourceContext) throws InterruptedException {
        long now = System.currentTimeMillis();
        long stop = now + timeout.toMillis();
        while (System.currentTimeMillis() < stop && sourceContext.getCollectedOutputs().isEmpty()) {
            Thread.sleep(10L);
        }
        return !sourceContext.getCollectedOutputs().isEmpty();
    }

    private static <T> void setupSource(DebeziumSourceFunction<T> source) throws Exception {
        LegacyMongoDBSourceTest.setupSource(source, false, null, null, true, 0, 1);
    }

    private static <T, S1, S2> void setupSource(DebeziumSourceFunction<T> source, boolean isRestored, ListState<S1> restoredOffsetState, ListState<S2> restoredHistoryState, boolean isCheckpointingEnabled, int subtaskIndex, int totalNumSubtasks) throws Exception {
        source.setRuntimeContext((RuntimeContext)new MockStreamingRuntimeContext(isCheckpointingEnabled, totalNumSubtasks, subtaskIndex));
        source.initializeState((FunctionInitializationContext)new MockFunctionInitializationContext(isRestored, new MockOperatorStateStore(restoredOffsetState, restoredHistoryState)));
        source.open(new Configuration());
    }

    private Document productDocOf(String id, String name, String description, Double weight) {
        Document document = new Document();
        if (id != null) {
            document.put("_id", (Object)new ObjectId(id));
        }
        document.put("name", (Object)name);
        document.put("description", (Object)description);
        document.put("weight", (Object)weight);
        return document;
    }

    private static final class TestingListState<T>
    implements ListState<T> {
        private final List<T> list = new ArrayList<T>();
        private boolean clearCalled = false;

        private TestingListState() {
        }

        public void clear() {
            this.list.clear();
            this.clearCalled = true;
        }

        public Iterable<T> get() throws Exception {
            return this.list;
        }

        public void add(T value) throws Exception {
            Preconditions.checkNotNull(value, (String)"You cannot add null to a ListState.");
            this.list.add(value);
        }

        public List<T> getList() {
            return this.list;
        }

        boolean isClearCalled() {
            return this.clearCalled;
        }

        public void update(List<T> values) throws Exception {
            this.clear();
            this.addAll(values);
        }

        public void addAll(List<T> values) throws Exception {
            if (values != null) {
                values.forEach(v -> Preconditions.checkNotNull((Object)v, (String)"You cannot add null to a ListState."));
                this.list.addAll(values);
            }
        }
    }

    private static class BlockingSourceContext<T>
    extends TestSourceContext<T> {
        private final Semaphore blocker = new Semaphore(0);
        private final int expectedCount;
        private int currentCount = 0;

        private BlockingSourceContext(int expectedCount) {
            this.expectedCount = expectedCount;
        }

        public void collect(T t) {
            super.collect(t);
            ++this.currentCount;
            if (this.currentCount == this.expectedCount) {
                try {
                    this.blocker.acquire();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }
    }

    private static class MockFunctionInitializationContext
    implements FunctionInitializationContext {
        private final boolean isRestored;
        private final OperatorStateStore operatorStateStore;

        private MockFunctionInitializationContext(boolean isRestored, OperatorStateStore operatorStateStore) {
            this.isRestored = isRestored;
            this.operatorStateStore = operatorStateStore;
        }

        public boolean isRestored() {
            return this.isRestored;
        }

        public OptionalLong getRestoredCheckpointId() {
            throw new UnsupportedOperationException();
        }

        public OperatorStateStore getOperatorStateStore() {
            return this.operatorStateStore;
        }

        public KeyedStateStore getKeyedStateStore() {
            throw new UnsupportedOperationException();
        }
    }

    private static class MockOperatorStateStore
    implements OperatorStateStore {
        private final ListState<?> restoredOffsetListState;
        private final ListState<?> restoredHistoryListState;

        private MockOperatorStateStore(ListState<?> restoredOffsetListState, ListState<?> restoredHistoryListState) {
            this.restoredOffsetListState = restoredOffsetListState;
            this.restoredHistoryListState = restoredHistoryListState;
        }

        public <S> ListState<S> getUnionListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
            if (stateDescriptor.getName().equals("offset-states")) {
                return this.restoredOffsetListState;
            }
            if (stateDescriptor.getName().equals("history-records-states")) {
                return this.restoredHistoryListState;
            }
            throw new IllegalStateException("Unknown state.");
        }

        public <K, V> BroadcastState<K, V> getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        public <S> ListState<S> getListState(ListStateDescriptor<S> stateDescriptor) throws Exception {
            throw new UnsupportedOperationException();
        }

        public Set<String> getRegisteredStateNames() {
            throw new UnsupportedOperationException();
        }

        public Set<String> getRegisteredBroadcastStateNames() {
            throw new UnsupportedOperationException();
        }
    }

    public static class ForwardDeserializeSchema
    implements DebeziumDeserializationSchema<SourceRecord> {
        private static final long serialVersionUID = 2975058057832211228L;

        public void deserialize(SourceRecord record, Collector<SourceRecord> out) throws Exception {
            out.collect((Object)record);
        }

        public TypeInformation<SourceRecord> getProducedType() {
            return TypeInformation.of(SourceRecord.class);
        }
    }
}

