/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql;

import com.fasterxml.jackson.core.JsonParseException;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Predicate;
import io.debezium.DebeziumException;
import io.debezium.document.Document;
import io.debezium.document.DocumentWriter;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.Statement;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cdc.connectors.mysql.LegacyMySqlTestBase;
import org.apache.flink.cdc.connectors.mysql.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.MySqlTestUtils;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.connectors.utils.AssertUtils;
import org.apache.flink.cdc.connectors.utils.TestSourceContext;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.DebeziumSourceFunction;
import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import org.apache.flink.cdc.debezium.internal.Handover;
import org.apache.flink.cdc.debezium.utils.DatabaseHistoryUtil;
import org.apache.flink.core.testutils.CheckedThread;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class LegacyMySqlSourceTest
extends LegacyMySqlTestBase {
    private final UniqueDatabase database = new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser", "mysqlpw");
    @Parameterized.Parameter
    public boolean useLegacyImplementation;

    public String getTempFilePath(String fileName) throws IOException {
        return super.getTempFilePath(fileName);
    }

    @Parameterized.Parameters(name="UseLegacyImplementation: {0}")
    public static Collection<Boolean> parameters() {
        return Arrays.asList(false, true);
    }

    @Before
    public void before() {
        this.database.createAndInitialize();
    }

    @After
    public void after() {
        this.database.dropDatabase();
    }

    @Test
    public void testConsumingAllEvents() throws Exception {
        final DebeziumSourceFunction<SourceRecord> source = this.createMySqlBinlogSource();
        final TestSourceContext sourceContext = new TestSourceContext();
        MySqlTestUtils.setupSource(source);
        try (Connection connection = this.database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            CheckedThread runThread = new CheckedThread(){

                public void go() throws Exception {
                    source.run((SourceFunction.SourceContext)sourceContext);
                }
            };
            runThread.start();
            List records = MySqlTestUtils.drain(sourceContext, 9);
            Assert.assertEquals((long)9L, (long)records.size());
            for (int i = 0; i < records.size(); ++i) {
                if (this.useLegacyImplementation) {
                    AssertUtils.assertInsert((SourceRecord)((SourceRecord)records.get(i)), (String)"id", (int)(101 + i));
                    continue;
                }
                AssertUtils.assertRead((SourceRecord)((SourceRecord)records.get(i)), (String)"id", (int)(101 + i));
            }
            statement.execute("INSERT INTO products VALUES (default,'robot','Toy robot',1.304)");
            records = MySqlTestUtils.drain(sourceContext, 1);
            AssertUtils.assertInsert((SourceRecord)((SourceRecord)records.get(0)), (String)"id", (int)110);
            statement.execute("INSERT INTO products VALUES (1001,'roy','old robot',1234.56)");
            records = MySqlTestUtils.drain(sourceContext, 1);
            AssertUtils.assertInsert((SourceRecord)((SourceRecord)records.get(0)), (String)"id", (int)1001);
            statement.execute("UPDATE products SET id=2001, description='really old robot' WHERE id=1001");
            records = MySqlTestUtils.drain(sourceContext, 2);
            AssertUtils.assertDelete((SourceRecord)((SourceRecord)records.get(0)), (String)"id", (int)1001);
            AssertUtils.assertInsert((SourceRecord)((SourceRecord)records.get(1)), (String)"id", (int)2001);
            statement.execute("UPDATE products SET weight=1345.67 WHERE id=2001");
            records = MySqlTestUtils.drain(sourceContext, 1);
            AssertUtils.assertUpdate((SourceRecord)((SourceRecord)records.get(0)), (String)"id", (int)2001);
            statement.execute(String.format("ALTER TABLE %s.products ADD COLUMN volume FLOAT, ADD COLUMN alias VARCHAR(30) NULL AFTER description", this.database.getDatabaseName()));
            statement.execute("UPDATE products SET volume=13.5 WHERE id=2001");
            records = MySqlTestUtils.drain(sourceContext, 1);
            AssertUtils.assertUpdate((SourceRecord)((SourceRecord)records.get(0)), (String)"id", (int)2001);
            source.close();
            runThread.sync();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCheckpointAndRestore() throws Exception {
        Throwable throwable;
        Statement statement4;
        List records3;
        Object statement22;
        MySqlTestUtils.TestingListState offsetState = new MySqlTestUtils.TestingListState();
        MySqlTestUtils.TestingListState<String> historyState = new MySqlTestUtils.TestingListState<String>();
        int prevPos = 0;
        final DebeziumSourceFunction<SourceRecord> source = this.createMySqlBinlogSource();
        final BlockingSourceContext sourceContext = new BlockingSourceContext(8);
        MySqlTestUtils.setupSource(source, false, offsetState, historyState, true, 0, 1);
        CheckedThread runThread = new CheckedThread(){

            public void go() throws Exception {
                source.run((SourceFunction.SourceContext)sourceContext);
            }
        };
        runThread.start();
        int received = MySqlTestUtils.drain(sourceContext, 2).size();
        Assert.assertEquals((long)2L, (long)received);
        Assert.assertFalse((boolean)this.waitForCheckpointLock(sourceContext.getCheckpointLock(), Duration.ofSeconds(3L)));
        sourceContext.blocker.release();
        Object records2 = MySqlTestUtils.drain(sourceContext, 9 - received);
        Assert.assertEquals((long)9L, (long)(records2.size() + received));
        Assert.assertEquals((long)0L, (long)offsetState.list.size());
        Assert.assertEquals((long)0L, (long)historyState.list.size());
        Object object = sourceContext.getCheckpointLock();
        synchronized (object) {
            source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(101L, 101L));
        }
        this.assertHistoryState(historyState);
        Assert.assertEquals((long)1L, (long)offsetState.list.size());
        String state = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
        Assert.assertEquals((Object)"mysql_binlog_source", (Object)JsonPath.read((String)state, (String)"$.sourcePartition.server", (Predicate[])new Predicate[0]));
        Assert.assertEquals((Object)"mysql-bin.000003", (Object)JsonPath.read((String)state, (String)"$.sourceOffset.file", (Predicate[])new Predicate[0]));
        Assert.assertFalse((boolean)state.contains("row"));
        Assert.assertFalse((boolean)state.contains("server_id"));
        Assert.assertFalse((boolean)state.contains("event"));
        int pos2 = (Integer)JsonPath.read((String)state, (String)"$.sourceOffset.pos", (Predicate[])new Predicate[0]);
        Assert.assertTrue((pos2 > prevPos ? 1 : 0) != 0);
        prevPos = pos2;
        source.close();
        runThread.sync();
        final DebeziumSourceFunction<SourceRecord> source2 = this.createMySqlBinlogSource();
        final TestSourceContext sourceContext2 = new TestSourceContext();
        MySqlTestUtils.setupSource(source2, true, offsetState, historyState, true, 0, 1);
        CheckedThread runThread2 = new CheckedThread(){

            public void go() throws Exception {
                source2.run((SourceFunction.SourceContext)sourceContext2);
            }
        };
        runThread2.start();
        Assert.assertFalse((boolean)this.waitForAvailableRecords(Duration.ofSeconds(5L), sourceContext2));
        Object connection = this.database.getJdbcConnection();
        records2 = null;
        try {
            statement22 = connection.createStatement();
            Throwable pos2 = null;
            try {
                statement22.execute("INSERT INTO products VALUES (default,'robot','Toy robot',1.304)");
                records3 = MySqlTestUtils.drain(sourceContext2, 1);
                Assert.assertEquals((long)1L, (long)records3.size());
                AssertUtils.assertInsert((SourceRecord)((SourceRecord)records3.get(0)), (String)"id", (int)110);
                Object object2 = sourceContext2.getCheckpointLock();
                synchronized (object2) {
                    source2.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(138L, 138L));
                }
                this.assertHistoryState(historyState);
                Assert.assertEquals((long)1L, (long)offsetState.list.size());
                String state2 = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
                Assert.assertEquals((Object)"mysql_binlog_source", (Object)JsonPath.read((String)state2, (String)"$.sourcePartition.server", (Predicate[])new Predicate[0]));
                Assert.assertEquals((Object)"mysql-bin.000003", (Object)JsonPath.read((String)state2, (String)"$.sourceOffset.file", (Predicate[])new Predicate[0]));
                Assert.assertEquals((Object)"1", (Object)JsonPath.read((String)state2, (String)"$.sourceOffset.row", (Predicate[])new Predicate[0]).toString());
                Assert.assertEquals((Object)"223344", (Object)JsonPath.read((String)state2, (String)"$.sourceOffset.server_id", (Predicate[])new Predicate[0]).toString());
                Assert.assertEquals((Object)"2", (Object)JsonPath.read((String)state2, (String)"$.sourceOffset.event", (Predicate[])new Predicate[0]).toString());
                int pos3 = (Integer)JsonPath.read((String)state2, (String)"$.sourceOffset.pos", (Predicate[])new Predicate[0]);
                Assert.assertTrue((pos3 > prevPos ? 1 : 0) != 0);
                prevPos = pos3;
                statement22.execute("INSERT INTO products VALUES (1001,'roy','old robot',1234.56)");
                statement22.execute("UPDATE products SET weight=1345.67 WHERE id=1001");
            }
            catch (Throwable records3) {
                pos2 = records3;
                throw records3;
            }
            finally {
                if (statement22 != null) {
                    if (pos2 != null) {
                        try {
                            statement22.close();
                        }
                        catch (Throwable records3) {
                            pos2.addSuppressed(records3);
                        }
                    } else {
                        statement22.close();
                    }
                }
            }
        }
        catch (Throwable statement22) {
            records2 = statement22;
            throw statement22;
        }
        finally {
            if (connection != null) {
                if (records2 != null) {
                    try {
                        connection.close();
                    }
                    catch (Throwable statement22) {
                        ((Throwable)records2).addSuppressed(statement22);
                    }
                } else {
                    connection.close();
                }
            }
        }
        source2.close();
        runThread2.sync();
        final DebeziumSourceFunction<SourceRecord> source3 = this.createMySqlBinlogSource();
        final TestSourceContext sourceContext3 = new TestSourceContext();
        MySqlTestUtils.setupSource(source3, true, offsetState, historyState, true, 0, 1);
        CheckedThread runThread3 = new CheckedThread(){

            public void go() throws Exception {
                source3.run((SourceFunction.SourceContext)sourceContext3);
            }
        };
        runThread3.start();
        Object records4 = MySqlTestUtils.drain(sourceContext3, 2);
        AssertUtils.assertInsert((SourceRecord)((SourceRecord)records4.get(0)), (String)"id", (int)1001);
        AssertUtils.assertUpdate((SourceRecord)((SourceRecord)records4.get(1)), (String)"id", (int)1001);
        Assert.assertFalse((boolean)this.waitForAvailableRecords(Duration.ofSeconds(3L), sourceContext3));
        Object connection2 = this.database.getJdbcConnection();
        statement22 = null;
        try {
            Statement statement3 = connection2.createStatement();
            records3 = null;
            try {
                statement3.execute("DELETE FROM products WHERE id=1001");
            }
            catch (Throwable throwable2) {
                records3 = throwable2;
                throw throwable2;
            }
            finally {
                if (statement3 != null) {
                    if (records3 != null) {
                        try {
                            statement3.close();
                        }
                        catch (Throwable throwable3) {
                            ((Throwable)((Object)records3)).addSuppressed(throwable3);
                        }
                    } else {
                        statement3.close();
                    }
                }
            }
        }
        catch (Throwable throwable4) {
            statement22 = throwable4;
            throw throwable4;
        }
        finally {
            if (connection2 != null) {
                if (statement22 != null) {
                    try {
                        connection2.close();
                    }
                    catch (Throwable throwable5) {
                        ((Throwable)statement22).addSuppressed(throwable5);
                    }
                } else {
                    connection2.close();
                }
            }
        }
        records4 = MySqlTestUtils.drain(sourceContext3, 1);
        AssertUtils.assertDelete((SourceRecord)((SourceRecord)records4.get(0)), (String)"id", (int)1001);
        connection2 = sourceContext3.getCheckpointLock();
        synchronized (connection2) {
            source3.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(233L, 233L));
        }
        this.assertHistoryState(historyState);
        Assert.assertEquals((long)1L, (long)offsetState.list.size());
        String state3 = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
        Assert.assertEquals((Object)"mysql_binlog_source", (Object)JsonPath.read((String)state3, (String)"$.sourcePartition.server", (Predicate[])new Predicate[0]));
        Assert.assertEquals((Object)"mysql-bin.000003", (Object)JsonPath.read((String)state3, (String)"$.sourceOffset.file", (Predicate[])new Predicate[0]));
        Assert.assertEquals((Object)"1", (Object)JsonPath.read((String)state3, (String)"$.sourceOffset.row", (Predicate[])new Predicate[0]).toString());
        Assert.assertEquals((Object)"223344", (Object)JsonPath.read((String)state3, (String)"$.sourceOffset.server_id", (Predicate[])new Predicate[0]).toString());
        Assert.assertEquals((Object)"2", (Object)JsonPath.read((String)state3, (String)"$.sourceOffset.event", (Predicate[])new Predicate[0]).toString());
        int pos4 = (Integer)JsonPath.read((String)state3, (String)"$.sourceOffset.pos", (Predicate[])new Predicate[0]);
        Assert.assertTrue((pos4 > prevPos ? 1 : 0) != 0);
        source3.close();
        runThread3.sync();
        final DebeziumSourceFunction<SourceRecord> source4 = this.createMySqlBinlogSource();
        final TestSourceContext sourceContext4 = new TestSourceContext();
        MySqlTestUtils.setupSource(source4, true, offsetState, historyState, true, 0, 1);
        CheckedThread runThread4 = new CheckedThread(){

            public void go() throws Exception {
                source4.run((SourceFunction.SourceContext)sourceContext4);
            }
        };
        runThread4.start();
        Assert.assertFalse((boolean)this.waitForAvailableRecords(Duration.ofSeconds(5L), sourceContext4));
        records4 = sourceContext4.getCheckpointLock();
        synchronized (records4) {
            source4.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(254L, 254L));
        }
        this.assertHistoryState(historyState);
        Assert.assertEquals((long)1L, (long)offsetState.list.size());
        String state4 = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
        Assert.assertEquals((Object)"mysql_binlog_source", (Object)JsonPath.read((String)state4, (String)"$.sourcePartition.server", (Predicate[])new Predicate[0]));
        Assert.assertEquals((Object)"mysql-bin.000003", (Object)JsonPath.read((String)state4, (String)"$.sourceOffset.file", (Predicate[])new Predicate[0]));
        Assert.assertEquals((Object)"1", (Object)JsonPath.read((String)state4, (String)"$.sourceOffset.row", (Predicate[])new Predicate[0]).toString());
        Assert.assertEquals((Object)"223344", (Object)JsonPath.read((String)state4, (String)"$.sourceOffset.server_id", (Predicate[])new Predicate[0]).toString());
        Assert.assertEquals((Object)"2", (Object)JsonPath.read((String)state4, (String)"$.sourceOffset.event", (Predicate[])new Predicate[0]).toString());
        int pos22 = (Integer)JsonPath.read((String)state4, (String)"$.sourceOffset.pos", (Predicate[])new Predicate[0]);
        Assert.assertTrue((pos22 > prevPos ? 1 : 0) != 0);
        source4.close();
        runThread4.sync();
        final DebeziumSourceFunction<SourceRecord> source5 = this.createMySqlBinlogSource();
        final TestSourceContext sourceContext5 = new TestSourceContext();
        MySqlTestUtils.setupSource(source5, true, offsetState, historyState, true, 0, 1);
        CheckedThread runThread5 = new CheckedThread(){

            public void go() throws Exception {
                source5.run((SourceFunction.SourceContext)sourceContext5);
            }
        };
        runThread5.start();
        connection = this.database.getJdbcConnection();
        Throwable pos22 = null;
        try {
            statement4 = connection.createStatement();
            throwable = null;
            try {
                statement4.execute("INSERT INTO products(id, description, weight) VALUES (default, 'Go go go', 111.1)");
                statement4.execute("ALTER TABLE products ADD comment_col VARCHAR(100) DEFAULT 'cdc'");
                records3 = MySqlTestUtils.drain(sourceContext5, 1);
                AssertUtils.assertInsert((SourceRecord)((SourceRecord)records3.get(0)), (String)"id", (int)1002);
            }
            catch (Throwable records5) {
                throwable = records5;
                throw records5;
            }
            finally {
                if (statement4 != null) {
                    if (throwable != null) {
                        try {
                            statement4.close();
                        }
                        catch (Throwable records5) {
                            throwable.addSuppressed(records5);
                        }
                    } else {
                        statement4.close();
                    }
                }
            }
        }
        catch (Throwable statement4) {
            pos22 = statement4;
            throw statement4;
        }
        finally {
            if (connection != null) {
                if (pos22 != null) {
                    try {
                        connection.close();
                    }
                    catch (Throwable statement4) {
                        pos22.addSuppressed(statement4);
                    }
                } else {
                    connection.close();
                }
            }
        }
        connection = sourceContext5.getCheckpointLock();
        synchronized (connection) {
            source5.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(300L, 300L));
        }
        this.assertHistoryState(historyState);
        Assert.assertEquals((long)1L, (long)offsetState.list.size());
        state4 = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
        Assert.assertEquals((Object)"mysql_binlog_source", (Object)JsonPath.read((String)state4, (String)"$.sourcePartition.server", (Predicate[])new Predicate[0]));
        Assert.assertEquals((Object)"mysql-bin.000003", (Object)JsonPath.read((String)state4, (String)"$.sourceOffset.file", (Predicate[])new Predicate[0]));
        Assert.assertEquals((Object)"1", (Object)JsonPath.read((String)state4, (String)"$.sourceOffset.row", (Predicate[])new Predicate[0]).toString());
        Assert.assertEquals((Object)"223344", (Object)JsonPath.read((String)state4, (String)"$.sourceOffset.server_id", (Predicate[])new Predicate[0]).toString());
        Assert.assertEquals((Object)"2", (Object)JsonPath.read((String)state4, (String)"$.sourceOffset.event", (Predicate[])new Predicate[0]).toString());
        int pos5 = (Integer)JsonPath.read((String)state4, (String)"$.sourceOffset.pos", (Predicate[])new Predicate[0]);
        Assert.assertTrue((pos5 > prevPos ? 1 : 0) != 0);
        source5.close();
        runThread5.sync();
        final DebeziumSourceFunction<SourceRecord> source6 = this.createMySqlBinlogSource();
        final TestSourceContext sourceContext6 = new TestSourceContext();
        MySqlTestUtils.setupSource(source6, true, offsetState, historyState, true, 0, 1);
        CheckedThread runThread6 = new CheckedThread(){

            public void go() throws Exception {
                source6.run((SourceFunction.SourceContext)sourceContext6);
            }
        };
        runThread6.start();
        connection = this.database.getJdbcConnection();
        Throwable throwable6 = null;
        try {
            statement4 = connection.createStatement();
            throwable = null;
            try {
                statement4.execute("INSERT INTO products(id, description, weight) VALUES (default, 'Run!', 22.2)");
                records3 = MySqlTestUtils.drain(sourceContext6, 1);
                AssertUtils.assertInsert((SourceRecord)((SourceRecord)records3.get(0)), (String)"id", (int)1003);
            }
            catch (Throwable throwable7) {
                throwable = throwable7;
                throw throwable7;
            }
            finally {
                if (statement4 != null) {
                    if (throwable != null) {
                        try {
                            statement4.close();
                        }
                        catch (Throwable throwable8) {
                            throwable.addSuppressed(throwable8);
                        }
                    } else {
                        statement4.close();
                    }
                }
            }
        }
        catch (Throwable throwable9) {
            throwable6 = throwable9;
            throw throwable9;
        }
        finally {
            if (connection != null) {
                if (throwable6 != null) {
                    try {
                        connection.close();
                    }
                    catch (Throwable throwable10) {
                        throwable6.addSuppressed(throwable10);
                    }
                } else {
                    connection.close();
                }
            }
        }
        source6.close();
        runThread6.sync();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRecoverFromRenameOperation() throws Exception {
        List records;
        MySqlTestUtils.TestingListState offsetState = new MySqlTestUtils.TestingListState();
        MySqlTestUtils.TestingListState historyState = new MySqlTestUtils.TestingListState();
        try (Connection connection = this.database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            final DebeziumSourceFunction<SourceRecord> source = this.createMySqlBinlogSource();
            final TestSourceContext sourceContext = new TestSourceContext();
            MySqlTestUtils.setupSource(source, false, offsetState, historyState, true, 0, 1);
            CheckedThread runThread = new CheckedThread(){

                public void go() throws Exception {
                    source.run((SourceFunction.SourceContext)sourceContext);
                }
            };
            runThread.start();
            records = MySqlTestUtils.drain(sourceContext, 9);
            Assert.assertEquals((long)9L, (long)records.size());
            Assert.assertEquals((long)0L, (long)offsetState.list.size());
            Assert.assertEquals((long)0L, (long)historyState.list.size());
            statement.execute("CREATE TABLE `tp_001_ogt_products` LIKE `products`;");
            statement.execute("RENAME TABLE `products` TO `tp_001_del_products`, `tp_001_ogt_products` TO `products`;");
            statement.execute("INSERT INTO `products` VALUES (110,'robot','Toy robot',1.304)");
            statement.execute("INSERT INTO `products` VALUES (111,'stream train','Town stream train',1.304)");
            statement.execute("INSERT INTO `products` VALUES (112,'cargo train','City cargo train',1.304)");
            int received = MySqlTestUtils.drain(sourceContext, 3).size();
            Assert.assertEquals((long)3L, (long)received);
            Object object = sourceContext.getCheckpointLock();
            synchronized (object) {
                source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(101L, 101L));
            }
            Assert.assertTrue((historyState.list.size() > 0 ? 1 : 0) != 0);
            Assert.assertTrue((offsetState.list.size() > 0 ? 1 : 0) != 0);
            source.close();
            runThread.sync();
        }
        final DebeziumSourceFunction<SourceRecord> source2 = this.createMySqlBinlogSource();
        final TestSourceContext sourceContext2 = new TestSourceContext();
        MySqlTestUtils.setupSource(source2, true, offsetState, historyState, true, 0, 1);
        CheckedThread runThread2 = new CheckedThread(){

            public void go() throws Exception {
                source2.run((SourceFunction.SourceContext)sourceContext2);
            }
        };
        runThread2.start();
        Assert.assertFalse((boolean)this.waitForAvailableRecords(Duration.ofSeconds(5L), sourceContext2));
        try (Connection connection = this.database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("INSERT INTO `products` VALUES (113,'Airplane','Toy airplane',1.304)");
            records = MySqlTestUtils.drain(sourceContext2, 1);
            Assert.assertEquals((long)1L, (long)records.size());
            AssertUtils.assertInsert((SourceRecord)((SourceRecord)records.get(0)), (String)"id", (int)113);
            source2.close();
            runThread2.sync();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStartupFromSpecificOffset() throws Exception {
        List records;
        Throwable throwable;
        Statement statement;
        try (Connection connection = this.database.getJdbcConnection();
             Statement statement2 = connection.createStatement();){
            statement2.execute("INSERT INTO products VALUES (default,'robot','Toy robot',1.304)");
        }
        Tuple2<String, Integer> offset = LegacyMySqlSourceTest.currentMySqlLatestOffset(MYSQL_CONTAINER, this.database, "products", 10, this.useLegacyImplementation);
        String offsetFile = (String)offset.f0;
        int offsetPos = (Integer)offset.f1;
        MySqlTestUtils.TestingListState offsetState = new MySqlTestUtils.TestingListState();
        MySqlTestUtils.TestingListState historyState = new MySqlTestUtils.TestingListState();
        try (Connection connection = this.database.getJdbcConnection();){
            statement = connection.createStatement();
            throwable = null;
            try {
                statement.execute("INSERT INTO products VALUES (1001,'roy','old robot',1234.56)");
                statement.execute("UPDATE products SET id=2001, description='really old robot' WHERE id=1001");
                statement.execute("UPDATE products SET weight=1345.67 WHERE id=2001");
                final DebeziumSourceFunction<SourceRecord> source2 = this.createMySqlBinlogSource(offsetFile, offsetPos);
                final TestSourceContext sourceContext2 = new TestSourceContext();
                MySqlTestUtils.setupSource(source2, false, offsetState, historyState, true, 0, 1);
                CheckedThread runThread2 = new CheckedThread(){

                    public void go() throws Exception {
                        source2.run((SourceFunction.SourceContext)sourceContext2);
                    }
                };
                runThread2.start();
                records = MySqlTestUtils.drain(sourceContext2, 4);
                Assert.assertEquals((long)4L, (long)records.size());
                AssertUtils.assertInsert((SourceRecord)((SourceRecord)records.get(0)), (String)"id", (int)1001);
                AssertUtils.assertDelete((SourceRecord)((SourceRecord)records.get(1)), (String)"id", (int)1001);
                AssertUtils.assertInsert((SourceRecord)((SourceRecord)records.get(2)), (String)"id", (int)2001);
                AssertUtils.assertUpdate((SourceRecord)((SourceRecord)records.get(3)), (String)"id", (int)2001);
                Object object = sourceContext2.getCheckpointLock();
                synchronized (object) {
                    source2.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(201L, 201L));
                }
                source2.close();
                runThread2.sync();
            }
            catch (Throwable source2) {
                throwable = source2;
                throw source2;
            }
            finally {
                if (statement != null) {
                    if (throwable != null) {
                        try {
                            statement.close();
                        }
                        catch (Throwable source2) {
                            throwable.addSuppressed(source2);
                        }
                    } else {
                        statement.close();
                    }
                }
            }
        }
        connection = this.database.getJdbcConnection();
        var7_14 = null;
        try {
            statement = connection.createStatement();
            throwable = null;
            try {
                final DebeziumSourceFunction<SourceRecord> source3 = this.createMySqlBinlogSource(offsetFile, offsetPos);
                final TestSourceContext sourceContext3 = new TestSourceContext();
                MySqlTestUtils.setupSource(source3, true, offsetState, historyState, true, 0, 1);
                CheckedThread runThread3 = new CheckedThread(){

                    public void go() throws Exception {
                        source3.run((SourceFunction.SourceContext)sourceContext3);
                    }
                };
                runThread3.start();
                statement.execute("DELETE FROM products WHERE id=2001");
                records = MySqlTestUtils.drain(sourceContext3, 1);
                Assert.assertEquals((long)1L, (long)records.size());
                AssertUtils.assertDelete((SourceRecord)((SourceRecord)records.get(0)), (String)"id", (int)2001);
                source3.close();
                runThread3.sync();
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (statement != null) {
                    if (throwable != null) {
                        try {
                            statement.close();
                        }
                        catch (Throwable throwable3) {
                            throwable.addSuppressed(throwable3);
                        }
                    } else {
                        statement.close();
                    }
                }
            }
        }
        catch (Throwable throwable4) {
            var7_14 = throwable4;
            throw throwable4;
        }
        finally {
            if (connection != null) {
                if (var7_14 != null) {
                    try {
                        connection.close();
                    }
                    catch (Throwable throwable5) {
                        var7_14.addSuppressed(throwable5);
                    }
                } else {
                    connection.close();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConsumingEmptyTable() throws Exception {
        MySqlTestUtils.TestingListState offsetState = new MySqlTestUtils.TestingListState();
        MySqlTestUtils.TestingListState<String> historyState = new MySqlTestUtils.TestingListState<String>();
        int prevPos = 0;
        final DebeziumSourceFunction source = MySqlTestUtils.basicSourceBuilder(this.database, "UTC", this.useLegacyImplementation).tableList(new String[]{this.database.getDatabaseName() + ".category"}).build();
        final BlockingSourceContext sourceContext = new BlockingSourceContext(8);
        MySqlTestUtils.setupSource(source, false, offsetState, historyState, true, 0, 1);
        CheckedThread runThread = new CheckedThread(){

            public void go() throws Exception {
                source.run((SourceFunction.SourceContext)sourceContext);
            }
        };
        runThread.start();
        while (!source.getDebeziumStarted()) {
            Thread.sleep(100L);
        }
        Object object = sourceContext.getCheckpointLock();
        synchronized (object) {
            source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(101L, 101L));
        }
        Assert.assertEquals((long)0L, (long)offsetState.list.size());
        Assert.assertFalse((boolean)this.waitForAvailableRecords(Duration.ofSeconds(5L), sourceContext));
        try (Connection connection = this.database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            statement.execute("INSERT INTO category VALUES (1, 'book')");
            statement.execute("INSERT INTO category VALUES (2, 'shoes')");
            statement.execute("UPDATE category SET category_name='books' WHERE id=1");
            List records = MySqlTestUtils.drain(sourceContext, 3);
            Assert.assertEquals((long)3L, (long)records.size());
            AssertUtils.assertInsert((SourceRecord)((SourceRecord)records.get(0)), (String)"id", (int)1);
            AssertUtils.assertInsert((SourceRecord)((SourceRecord)records.get(1)), (String)"id", (int)2);
            AssertUtils.assertUpdate((SourceRecord)((SourceRecord)records.get(2)), (String)"id", (int)1);
            Object object2 = sourceContext.getCheckpointLock();
            synchronized (object2) {
                source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(138L, 138L));
            }
            this.assertHistoryState(historyState);
            Assert.assertEquals((long)1L, (long)offsetState.list.size());
            String state = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
            Assert.assertEquals((Object)"mysql_binlog_source", (Object)JsonPath.read((String)state, (String)"$.sourcePartition.server", (Predicate[])new Predicate[0]));
            Assert.assertEquals((Object)"mysql-bin.000003", (Object)JsonPath.read((String)state, (String)"$.sourceOffset.file", (Predicate[])new Predicate[0]));
            Assert.assertEquals((Object)"1", (Object)JsonPath.read((String)state, (String)"$.sourceOffset.row", (Predicate[])new Predicate[0]).toString());
            Assert.assertEquals((Object)"223344", (Object)JsonPath.read((String)state, (String)"$.sourceOffset.server_id", (Predicate[])new Predicate[0]).toString());
            Assert.assertEquals((Object)"2", (Object)JsonPath.read((String)state, (String)"$.sourceOffset.event", (Predicate[])new Predicate[0]).toString());
            int pos = (Integer)JsonPath.read((String)state, (String)"$.sourceOffset.pos", (Predicate[])new Predicate[0]);
            Assert.assertTrue((pos > prevPos ? 1 : 0) != 0);
        }
        source.close();
        runThread.sync();
    }

    @Test
    public void testChooseDatabase() throws Exception {
        MySqlTestUtils.TestingListState offsetState = new MySqlTestUtils.TestingListState();
        MySqlTestUtils.TestingListState<String> historyState = new MySqlTestUtils.TestingListState<String>();
        historyState.add("engine-name");
        DocumentWriter writer = DocumentWriter.defaultWriter();
        if (this.useLegacyImplementation) {
            FlinkJsonTableChangeSerializer tableChangesSerializer = new FlinkJsonTableChangeSerializer();
            historyState.add(writer.write(tableChangesSerializer.toDocument(new TableChanges.TableChange(TableChanges.TableChangeType.CREATE, MockedTable.INSTANCE))));
        } else {
            Document document = new HistoryRecord(Collections.emptyMap(), Collections.emptyMap(), "test", "test", "CREATE TABLE test(a int)", null).document();
            historyState.add(writer.write(document));
        }
        final DebeziumSourceFunction<SourceRecord> source = this.createMySqlBinlogSource();
        MySqlTestUtils.setupSource(source, true, offsetState, historyState, true, 0, 1);
        final TestSourceContext sourceContext = new TestSourceContext();
        CheckedThread runThread = new CheckedThread(){

            public void go() throws Exception {
                source.run((SourceFunction.SourceContext)sourceContext);
            }
        };
        runThread.start();
        if (this.useLegacyImplementation) {
            try {
                source.close();
                runThread.sync();
                Assert.fail((String)"Should fail.");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e instanceof IllegalStateException));
                Assert.assertEquals((Object)"The configured option 'debezium.internal.implementation' is 'legacy', but the state of source is incompatible with this implementation, you should remove the the option.", (Object)e.getMessage());
            }
        } else {
            this.waitDebeziumStartWithTimeout(source, 5000L);
            source.close();
            runThread.sync();
        }
    }

    @Test
    public void testLoadIllegalState() throws Exception {
        MySqlTestUtils.TestingListState offsetState = new MySqlTestUtils.TestingListState();
        MySqlTestUtils.TestingListState<String> historyState = new MySqlTestUtils.TestingListState<String>();
        historyState.add("engine-name");
        historyState.add("IllegalState");
        DebeziumSourceFunction<SourceRecord> source = this.createMySqlBinlogSource();
        try {
            MySqlTestUtils.setupSource(source, true, offsetState, historyState, true, 0, 1);
            Assert.fail((String)"Should fail.");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof JsonParseException));
            Assert.assertTrue((boolean)e.getMessage().contains("Unrecognized token 'IllegalState'"));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSchemaRemovedBeforeCheckpoint() throws Exception {
        MySqlTestUtils.TestingListState offsetState = new MySqlTestUtils.TestingListState();
        MySqlTestUtils.TestingListState historyState = new MySqlTestUtils.TestingListState();
        try (Connection connection = this.database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            final DebeziumSourceFunction<SourceRecord> source = this.createMySqlBinlogSource();
            final TestSourceContext sourceContext = new TestSourceContext();
            MySqlTestUtils.setupSource(source, false, offsetState, historyState, true, 0, 1);
            CheckedThread runThread = new CheckedThread(){

                public void go() throws Exception {
                    source.run((SourceFunction.SourceContext)sourceContext);
                }
            };
            runThread.start();
            List records = MySqlTestUtils.drain(sourceContext, 9);
            Assert.assertEquals((long)9L, (long)records.size());
            Assert.assertEquals((long)0L, (long)offsetState.list.size());
            Assert.assertEquals((long)0L, (long)historyState.list.size());
            statement.execute("INSERT INTO `products` VALUES (110,'robot','Toy robot',1.304)");
            int received = MySqlTestUtils.drain(sourceContext, 1).size();
            Assert.assertEquals((long)1L, (long)received);
            Object object = sourceContext.getCheckpointLock();
            synchronized (object) {
                source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(101L, 101L));
            }
            Assert.assertTrue((historyState.list.size() > 0 ? 1 : 0) != 0);
            Assert.assertTrue((offsetState.list.size() > 0 ? 1 : 0) != 0);
            String engineInstanceName = source.getEngineInstanceName();
            DatabaseHistoryUtil.removeHistory((String)engineInstanceName);
            try {
                Object object2 = sourceContext.getCheckpointLock();
                synchronized (object2) {
                    source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(102L, 102L));
                }
                Assert.fail((String)"Should fail.");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e instanceof IllegalStateException));
                Assert.assertTrue((boolean)e.getMessage().contains(String.format("Retrieve schema history failed, the schema records for engine %s has been removed, this might because the debezium engine has been shutdown due to other errors.", engineInstanceName)));
            }
            finally {
                source.close();
                runThread.sync();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSnapshotOnClosedSource() throws Exception {
        MySqlTestUtils.TestingListState offsetState = new MySqlTestUtils.TestingListState();
        MySqlTestUtils.TestingListState historyState = new MySqlTestUtils.TestingListState();
        try (Connection connection = this.database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            final DebeziumSourceFunction<SourceRecord> source = this.createMySqlBinlogSource();
            final TestSourceContext sourceContext = new TestSourceContext();
            MySqlTestUtils.setupSource(source, false, offsetState, historyState, true, 0, 1);
            CheckedThread runThread = new CheckedThread(){

                public void go() throws Exception {
                    source.run((SourceFunction.SourceContext)sourceContext);
                }
            };
            runThread.start();
            List records = MySqlTestUtils.drain(sourceContext, 9);
            Assert.assertEquals((long)9L, (long)records.size());
            Assert.assertEquals((long)0L, (long)offsetState.list.size());
            Assert.assertEquals((long)0L, (long)historyState.list.size());
            statement.execute("INSERT INTO `products` VALUES (110,'robot','Toy robot',1.304)");
            int received = MySqlTestUtils.drain(sourceContext, 1).size();
            Assert.assertEquals((long)1L, (long)received);
            Object object = sourceContext.getCheckpointLock();
            synchronized (object) {
                source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(101L, 101L));
            }
            Assert.assertTrue((historyState.list.size() > 0 ? 1 : 0) != 0);
            Assert.assertTrue((offsetState.list.size() > 0 ? 1 : 0) != 0);
            Handover handover = source.getHandover();
            handover.close();
            Object object2 = sourceContext.getCheckpointLock();
            synchronized (object2) {
                source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(102L, 102L));
            }
            Assert.assertTrue((historyState.list.size() > 0 ? 1 : 0) != 0);
            Assert.assertTrue((offsetState.list.size() > 0 ? 1 : 0) != 0);
            source.close();
            runThread.sync();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testSnapshotOnFailedSource() throws Exception {
        MySqlTestUtils.TestingListState offsetState = new MySqlTestUtils.TestingListState();
        MySqlTestUtils.TestingListState historyState = new MySqlTestUtils.TestingListState();
        try (Connection connection = this.database.getJdbcConnection();
             Statement statement = connection.createStatement();){
            final DebeziumSourceFunction<SourceRecord> source = this.createMySqlBinlogSource();
            final TestSourceContext sourceContext = new TestSourceContext();
            MySqlTestUtils.setupSource(source, false, offsetState, historyState, true, 0, 1);
            CheckedThread runThread = new CheckedThread(){

                public void go() throws Exception {
                    source.run((SourceFunction.SourceContext)sourceContext);
                }
            };
            runThread.start();
            List records = MySqlTestUtils.drain(sourceContext, 9);
            Assert.assertEquals((long)9L, (long)records.size());
            Assert.assertEquals((long)0L, (long)offsetState.list.size());
            Assert.assertEquals((long)0L, (long)historyState.list.size());
            statement.execute("INSERT INTO `products` VALUES (110,'robot','Toy robot',1.304)");
            int received = MySqlTestUtils.drain(sourceContext, 1).size();
            Assert.assertEquals((long)1L, (long)received);
            Object object = sourceContext.getCheckpointLock();
            synchronized (object) {
                source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(101L, 101L));
            }
            Assert.assertTrue((historyState.list.size() > 0 ? 1 : 0) != 0);
            Assert.assertTrue((offsetState.list.size() > 0 ? 1 : 0) != 0);
            Handover handover = source.getHandover();
            try {
                Object object2 = handover.getLock();
                synchronized (object2) {
                    handover.reportError((Throwable)new DebeziumException("Mocked debezium exception"));
                    handover.close();
                    Object object3 = sourceContext.getCheckpointLock();
                    synchronized (object3) {
                        source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(102L, 102L));
                    }
                    handover.getLock().notifyAll();
                }
                Assert.fail((String)"Should fail.");
            }
            catch (Exception e) {
                Assert.assertTrue((boolean)(e instanceof FlinkRuntimeException));
                Assert.assertTrue((boolean)e.getMessage().contains("Call snapshotState() on failed source, checkpoint failed."));
                Assert.assertTrue((boolean)(e.getCause() instanceof Handover.ClosedException));
                Assert.assertTrue((boolean)e.getCause().getMessage().contains("Close handover with error."));
            }
            finally {
                source.close();
                runThread.sync();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static Tuple2<String, Integer> currentMySqlLatestOffset(MySqlContainer container, UniqueDatabase database, String table, int expectedRecordCount, boolean useLegacyImplementation) throws Exception {
        final DebeziumSourceFunction source = MySqlSource.builder().hostname(container.getHost()).port(container.getDatabasePort()).databaseList(new String[]{database.getDatabaseName()}).tableList(new String[]{database.getDatabaseName() + "." + table}).username(container.getUsername()).password(container.getPassword()).deserializer((DebeziumDeserializationSchema)new MySqlTestUtils.ForwardDeserializeSchema()).debeziumProperties(LegacyMySqlSourceTest.createDebeziumProperties(useLegacyImplementation)).build();
        MySqlTestUtils.TestingListState offsetState = new MySqlTestUtils.TestingListState();
        MySqlTestUtils.TestingListState historyState = new MySqlTestUtils.TestingListState();
        final TestSourceContext sourceContext = new TestSourceContext();
        MySqlTestUtils.setupSource(source, false, offsetState, historyState, true, 0, 1);
        CheckedThread runThread = new CheckedThread(){

            public void go() throws Exception {
                source.run((SourceFunction.SourceContext)sourceContext);
            }
        };
        runThread.start();
        MySqlTestUtils.drain(sourceContext, expectedRecordCount);
        Object object = sourceContext.getCheckpointLock();
        synchronized (object) {
            source.snapshotState((FunctionSnapshotContext)new StateSnapshotContextSynchronousImpl(101L, 101L));
        }
        Assert.assertEquals((long)1L, (long)offsetState.list.size());
        String state = new String((byte[])offsetState.list.get(0), StandardCharsets.UTF_8);
        String offsetFile = (String)JsonPath.read((String)state, (String)"$.sourceOffset.file", (Predicate[])new Predicate[0]);
        int offsetPos = (Integer)JsonPath.read((String)state, (String)"$.sourceOffset.pos", (Predicate[])new Predicate[0]);
        source.close();
        runThread.sync();
        return Tuple2.of((Object)offsetFile, (Object)offsetPos);
    }

    private static Properties createDebeziumProperties(boolean useLegacyImplementation) {
        Properties debeziumProps = new Properties();
        if (useLegacyImplementation) {
            debeziumProps.put("internal.implementation", "legacy");
            debeziumProps.put("transforms", "snapshotasinsert");
            debeziumProps.put("transforms.snapshotasinsert.type", "io.debezium.connector.mysql.transforms.ReadToInsertEvent");
        }
        return debeziumProps;
    }

    private void waitDebeziumStartWithTimeout(DebeziumSourceFunction<SourceRecord> source, Long timeout) throws Exception {
        long start = System.currentTimeMillis();
        long end = start + timeout;
        while (!source.getDebeziumStarted()) {
            Thread.sleep(100L);
            long now = System.currentTimeMillis();
            if (now <= end) continue;
            Assert.fail((String)"Should fail.");
        }
    }

    private void assertHistoryState(MySqlTestUtils.TestingListState<String> historyState) {
        Assert.assertTrue((historyState.list.size() > 0 ? 1 : 0) != 0);
        if (!this.useLegacyImplementation) {
            boolean hasTable = historyState.list.stream().skip(1L).anyMatch(history -> !((Map)JsonPath.read((String)history, (String)"$.table", (Predicate[])new Predicate[0])).isEmpty() && (JsonPath.read((String)history, (String)"$.type", (Predicate[])new Predicate[0]).toString().equals("CREATE") || JsonPath.read((String)history, (String)"$.type", (Predicate[])new Predicate[0]).toString().equals("ALTER")));
            Assert.assertTrue((boolean)hasTable);
        } else {
            boolean hasDDL = historyState.list.stream().skip(1L).anyMatch(history -> JsonPath.read((String)history, (String)"$.source.server", (Predicate[])new Predicate[0]).equals("mysql_binlog_source") && JsonPath.read((String)history, (String)"$.position.snapshot", (Predicate[])new Predicate[0]).toString().equals("true") && JsonPath.read((String)history, (String)"$.ddl", (Predicate[])new Predicate[0]).toString().startsWith("CREATE TABLE `products`"));
            Assert.assertTrue((boolean)hasDDL);
        }
    }

    private DebeziumSourceFunction<SourceRecord> createMySqlBinlogSource(String offsetFile, int offsetPos) {
        return MySqlTestUtils.basicSourceBuilder(this.database, "UTC", this.useLegacyImplementation).startupOptions(StartupOptions.specificOffset((String)offsetFile, (long)offsetPos)).build();
    }

    private DebeziumSourceFunction<SourceRecord> createMySqlBinlogSource() {
        return MySqlTestUtils.basicSourceBuilder(this.database, "UTC", this.useLegacyImplementation).build();
    }

    private boolean waitForCheckpointLock(Object checkpointLock, Duration timeout) throws Exception {
        Semaphore semaphore = new Semaphore(0);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(() -> {
            Object object = checkpointLock;
            synchronized (object) {
                semaphore.release();
            }
        });
        boolean result = semaphore.tryAcquire(timeout.toMillis(), TimeUnit.MILLISECONDS);
        executor.shutdownNow();
        return result;
    }

    private boolean waitForAvailableRecords(Duration timeout, TestSourceContext<?> sourceContext) throws InterruptedException {
        long now = System.currentTimeMillis();
        long stop = now + timeout.toMillis();
        while (System.currentTimeMillis() < stop && sourceContext.getCollectedOutputs().isEmpty()) {
            Thread.sleep(10L);
        }
        return !sourceContext.getCollectedOutputs().isEmpty();
    }

    private static class MockedTable
    implements Table {
        private static final Table INSTANCE = new MockedTable();

        private MockedTable() {
        }

        public String comment() {
            return "";
        }

        public TableId id() {
            return TableId.parse((String)"Test");
        }

        public List<String> primaryKeyColumnNames() {
            return Collections.emptyList();
        }

        public List<String> retrieveColumnNames() {
            return Collections.emptyList();
        }

        public List<Column> columns() {
            return Collections.emptyList();
        }

        public Column columnWithName(String name) {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public String defaultCharsetName() {
            return "UTF-8";
        }

        public TableEditor edit() {
            throw new UnsupportedOperationException("Not implemented.");
        }
    }

    private static class BlockingSourceContext<T>
    extends TestSourceContext<T> {
        private final Semaphore blocker = new Semaphore(0);
        private final int expectedCount;
        private int currentCount = 0;

        private BlockingSourceContext(int expectedCount) {
            this.expectedCount = expectedCount;
        }

        public void collect(T t) {
            super.collect(t);
            ++this.currentCount;
            if (this.currentCount == this.expectedCount) {
                try {
                    this.blocker.acquire();
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
        }
    }
}

