package io.debezium.pipeline;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.junit.EqualityCheck;
import io.debezium.junit.SkipWhenConnectorUnderTest;
import io.debezium.junit.SkipWhenConnectorsUnderTest;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.signal.actions.AbstractSnapshotSignal;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest;
import java.lang.management.ManagementFactory;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import javax.management.AttributeNotFoundException;
import javax.management.InstanceNotFoundException;
import javax.management.MBeanException;
import javax.management.MalformedObjectNameException;
import javax.management.ReflectionException;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.TabularDataSupport;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceConnector;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.Test;

/* loaded from: input_file:io/debezium/pipeline/AbstractBlockingSnapshotTest.class */
public abstract class AbstractBlockingSnapshotTest<T extends SourceConnector> extends AbstractSnapshotTest<T> {
    private int signalingRecords;
    protected static final int ROW_COUNT = 1000;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest
    public abstract Configuration.Builder mutableConfig(boolean z, boolean z2);

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest
    public abstract JdbcConnection databaseConnection();

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest
    public abstract String topicName();

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest
    public abstract String tableName();

    protected abstract String escapedTableDataCollectionId();

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest
    public abstract String connector();

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest
    public abstract String server();

    protected Configuration.Builder historizedMutableConfig(boolean z, boolean z2) {
        return mutableConfig(z, z2);
    }

    @Test
    public void executeBlockingSnapshot() throws Exception {
        populateTable();
        startConnectorWithSnapshot(builder -> {
            return mutableConfig(false, false);
        });
        waitForSnapshotToBeCompleted(connector(), server(), task(), database());
        insertRecords(ROW_COUNT, ROW_COUNT);
        assertRecordsFromSnapshotAndStreamingArePresent(2000, consumeRecordsByTopic(2000, 10));
        sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", AbstractSnapshotSignal.SnapshotType.BLOCKING, tableDataCollectionId());
        waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        this.signalingRecords = 1;
        assertRecordsFromSnapshotAndStreamingArePresent(2000, consumeRecordsByTopic(2000 + this.signalingRecords, 10));
        insertRecords(ROW_COUNT, 2000);
        assertStreamingRecordsArePresent(ROW_COUNT, consumeRecordsByTopic(ROW_COUNT, 10));
    }

    @Test
    public void executeBlockingSnapshotWhileStreaming() throws Exception {
        populateTable();
        startConnectorWithSnapshot(builder -> {
            return mutableConfig(false, false);
        });
        waitForSnapshotToBeCompleted(connector(), server(), task(), database());
        Future<?> executeAsync = executeAsync(insertTask());
        Thread.sleep(2000L);
        sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", AbstractSnapshotSignal.SnapshotType.BLOCKING, tableDataCollectionId());
        waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
        Long totalSnapshotRecords = getTotalSnapshotRecords(tableDataCollectionId(), connector(), server(), task(), database());
        executeAsync.get(120L, TimeUnit.SECONDS);
        insertRecords(ROW_COUNT, 2000);
        this.signalingRecords = 1;
        assertRecordsWithValuesPresent((int) (3000 + totalSnapshotRecords.longValue()), getExpectedValues(totalSnapshotRecords), topicName(), consumeRecordsByTopic((int) (3000 + totalSnapshotRecords.longValue() + this.signalingRecords), 10));
    }

    @Test
    public void executeBlockingSnapshotWithAdditionalCondition() throws Exception {
        populateTable(tableNames().get(1).toString());
        startConnectorWithSnapshot(builder -> {
            return mutableConfig(false, false);
        });
        waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
        sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map.of(tableDataCollectionIds().get(1), String.format("SELECT * FROM %s WHERE aa < 500", tableNames().get(1))), "", AbstractSnapshotSignal.SnapshotType.BLOCKING, tableDataCollectionIds().get(1).toString());
        waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        this.signalingRecords = 1;
        assertRecordsWithValuesPresent(500, (List) IntStream.rangeClosed(0, 499).boxed().collect(Collectors.toList()), topicNames().get(1).toString(), consumeRecordsByTopic(500 + this.signalingRecords, 10));
    }

    @Test
    @FixFor({"DBZ-8238"})
    public void streamingMetricsResumeAfterBlockingSnapshot() throws Exception {
        populateTable();
        startConnectorWithSnapshot(builder -> {
            return mutableConfig(false, false);
        });
        waitForSnapshotToBeCompleted(connector(), server(), task(), database());
        sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map.of(tableName(), String.format("SELECT * FROM %s WHERE aa < 500", tableName())), "", AbstractSnapshotSignal.SnapshotType.BLOCKING, tableDataCollectionId());
        waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        insertRecords(ROW_COUNT, 2000);
        this.signalingRecords = 1;
        assertStreamingTotalNumberOfCreateEventsSeen(Long.valueOf(ROW_COUNT + this.signalingRecords));
    }

    @Test
    @SkipWhenConnectorsUnderTest({@SkipWhenConnectorUnderTest(check = EqualityCheck.EQUAL, value = SkipWhenConnectorUnderTest.Connector.POSTGRES), @SkipWhenConnectorUnderTest(check = EqualityCheck.EQUAL, value = SkipWhenConnectorUnderTest.Connector.SQL_SERVER), @SkipWhenConnectorUnderTest(check = EqualityCheck.EQUAL, value = SkipWhenConnectorUnderTest.Connector.DB2)})
    public void readsSchemaOnlyForSignaledTables() throws Exception {
        populateTable(tableNames().get(1).toString());
        startConnectorWithSnapshot(builder -> {
            return historizedMutableConfig(false, false);
        });
        waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());
        sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map.of(tableDataCollectionIds().get(1), String.format("SELECT * FROM %s WHERE aa < 500", tableNames().get(1))), "", AbstractSnapshotSignal.SnapshotType.BLOCKING, tableDataCollectionIds().get(1).toString());
        waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        this.signalingRecords = 1;
        AbstractConnectorTest.SourceRecords consumeRecordsByTopic = consumeRecordsByTopic(500 + this.signalingRecords + expectedDdlsCount(), 1);
        assertRecordsWithValuesPresent(500, (List) IntStream.rangeClosed(0, 499).boxed().collect(Collectors.toList()), topicNames().get(1).toString(), consumeRecordsByTopic);
        assertDdl((List) consumeRecordsByTopic.recordsForTopic(server()).stream().map(sourceRecord -> {
            return ((Struct) sourceRecord.value()).getString("ddl");
        }).collect(Collectors.toList()));
    }

    @Test
    @FixFor({"DBZ-7718"})
    public void executeBlockingSnapshotWithEscapedCollectionName() throws Exception {
        populateTable();
        startConnectorWithSnapshot(builder -> {
            return mutableConfig(false, false);
        });
        waitForSnapshotToBeCompleted(connector(), server(), task(), database());
        insertRecords(ROW_COUNT, ROW_COUNT);
        assertRecordsFromSnapshotAndStreamingArePresent(2000, consumeRecordsByTopic(2000, 10));
        sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", AbstractSnapshotSignal.SnapshotType.BLOCKING, escapedTableDataCollectionId());
        waitForLogMessage("Snapshot completed", AbstractSnapshotChangeEventSource.class);
        this.signalingRecords = 1;
        assertRecordsFromSnapshotAndStreamingArePresent(2000, consumeRecordsByTopic(2000 + this.signalingRecords, 10));
        insertRecords(ROW_COUNT, 2000);
        assertStreamingRecordsArePresent(ROW_COUNT, consumeRecordsByTopic(ROW_COUNT, 10));
    }

    @Test
    @FixFor({"DBZ-8244"})
    public void anErrorDuringBlockingSnapshotShouldLeaveTheConnectorInAGoodState() throws Exception {
        populateTable();
        startConnectorWithSnapshot(builder -> {
            return mutableConfig(false, false).with(CommonConnectorConfig.MAX_BATCH_SIZE, 1);
        });
        waitForSnapshotToBeCompleted(connector(), server(), task(), database());
        insertRecords(ROW_COUNT, ROW_COUNT);
        assertRecordsFromSnapshotAndStreamingArePresent(2000, consumeRecordsByTopic(2000, 20));
        sendAdHocSnapshotSignalWithAdditionalConditionsWithSurrogateKey(Map.of(tableDataCollectionIds().get(1), "SELECT WITH AN ERROR"), "", AbstractSnapshotSignal.SnapshotType.BLOCKING, tableDataCollectionIds().get(1));
        waitForLogMessage("Snapshot was not completed successfully", AbstractSnapshotChangeEventSource.class);
        insertRecords(ROW_COUNT, 2000);
        this.signalingRecords = 1;
        assertStreamingRecordsArePresent(ROW_COUNT, consumeRecordsByTopic(ROW_COUNT + this.signalingRecords, 10));
    }

    protected int expectedDdlsCount() {
        return 0;
    }

    protected void assertDdl(List<String> list) {
    }

    protected int insertMaxSleep() {
        return 2;
    }

    private Runnable insertTask() {
        return () -> {
            try {
                insertRecordsWithRandomSleep(ROW_COUNT, ROW_COUNT, insertMaxSleep());
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        };
    }

    private Long getTotalStreamingCreateEventsSeen(String str, String str2, String str3, String str4) throws MalformedObjectNameException, ReflectionException, AttributeNotFoundException, InstanceNotFoundException, MBeanException {
        return (Long) ManagementFactory.getPlatformMBeanServer().getAttribute(getStreamingMetricsObjectName(str, str2, "streaming", str3, str4), "TotalNumberOfCreateEventsSeen");
    }

    private Long getTotalSnapshotRecords(String str, String str2, String str3, String str4, String str5) throws MalformedObjectNameException, ReflectionException, AttributeNotFoundException, InstanceNotFoundException, MBeanException {
        return (Long) ((Map) ((TabularDataSupport) ManagementFactory.getPlatformMBeanServer().getAttribute(getSnapshotMetricsObjectName(str2, str3, str4, str5), "RowsScanned")).values().stream().map(obj -> {
            return (CompositeDataSupport) obj;
        }).collect(Collectors.toMap(compositeDataSupport -> {
            return compositeDataSupport.get("key").toString();
        }, compositeDataSupport2 -> {
            return compositeDataSupport2.get("value");
        }))).get(str.replace("`", ""));
    }

    private static List<Integer> getExpectedValues(Long l) {
        List list = (List) IntStream.rangeClosed(0, 999).boxed().collect(Collectors.toList());
        return (List) Stream.of((Object[]) new List[]{list, (List) IntStream.rangeClosed(ROW_COUNT, 1999).boxed().collect(Collectors.toList()), (List) Stream.of((Object[]) new List[]{list, (List) IntStream.rangeClosed(ROW_COUNT, Math.toIntExact(l.longValue())).boxed().collect(Collectors.toList())}).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList()), (List) IntStream.rangeClosed(2000, 2999).boxed().collect(Collectors.toList())}).flatMap((v0) -> {
            return v0.stream();
        }).collect(Collectors.toList());
    }

    protected static void waitForLogMessage(String str, Class<?> cls) {
        LogInterceptor logInterceptor = new LogInterceptor(cls);
        Awaitility.await().alias("Snapshot not completed on time").pollInterval(100L, TimeUnit.MILLISECONDS).atMost(waitTimeForRecords() * 30, TimeUnit.SECONDS).until(() -> {
            return Boolean.valueOf(logInterceptor.containsMessage(str));
        });
    }

    private void assertStreamingTotalNumberOfCreateEventsSeen(Long l) throws ReflectionException, MalformedObjectNameException, AttributeNotFoundException, InstanceNotFoundException, MBeanException {
        try {
            Awaitility.await().pollInterval(1000L, TimeUnit.MILLISECONDS).atMost(waitTimeForRecords() * 30, TimeUnit.SECONDS).until(() -> {
                return Boolean.valueOf(Objects.equals(getTotalStreamingCreateEventsSeen(connector(), server(), task(), database()), l));
            });
            Long totalStreamingCreateEventsSeen = getTotalStreamingCreateEventsSeen(connector(), server(), task(), database());
            Assertions.assertThat(totalStreamingCreateEventsSeen).withFailMessage("streaming TotalNumberOfCreateEventsSeen metric value expected: %d actual: %d", new Object[]{l, totalStreamingCreateEventsSeen}).isEqualTo(l);
        } catch (ConditionTimeoutException e) {
            Long totalStreamingCreateEventsSeen2 = getTotalStreamingCreateEventsSeen(connector(), server(), task(), database());
            Assertions.assertThat(totalStreamingCreateEventsSeen2).withFailMessage("streaming TotalNumberOfCreateEventsSeen metric value expected: %d actual: %d", new Object[]{l, totalStreamingCreateEventsSeen2}).isEqualTo(l);
        } catch (Throwable th) {
            Long totalStreamingCreateEventsSeen3 = getTotalStreamingCreateEventsSeen(connector(), server(), task(), database());
            Assertions.assertThat(totalStreamingCreateEventsSeen3).withFailMessage("streaming TotalNumberOfCreateEventsSeen metric value expected: %d actual: %d", new Object[]{l, totalStreamingCreateEventsSeen3}).isEqualTo(l);
            throw th;
        }
    }

    private Future<?> executeAsync(Runnable runnable) {
        return Executors.newSingleThreadExecutor().submit(runnable);
    }

    protected void assertStreamingRecordsArePresent(int i, AbstractConnectorTest.SourceRecords sourceRecords) {
        assertRecordsWithValuesPresent(i, (List) IntStream.rangeClosed(2000, 2999).boxed().collect(Collectors.toList()), topicName(), sourceRecords);
    }

    protected void assertRecordsFromSnapshotAndStreamingArePresent(int i, AbstractConnectorTest.SourceRecords sourceRecords) throws InterruptedException {
        assertRecordsWithValuesPresent(i, (List) IntStream.range(0, i - 1).boxed().collect(Collectors.toList()), topicName(), sourceRecords);
    }

    private void assertRecordsWithValuesPresent(int i, List<Integer> list, String str, AbstractConnectorTest.SourceRecords sourceRecords) {
        List list2 = (List) sourceRecords.recordsForTopic(str).stream().map(sourceRecord -> {
            return ((Struct) sourceRecord.value()).getStruct("after").getInt32(valueFieldName());
        }).collect(Collectors.toList());
        Assertions.assertThat(sourceRecords.recordsForTopic(str).size()).isEqualTo(i);
        Assertions.assertThat(list2).containsAll(list);
    }

    protected void insertRecords(int i, int i2) throws SQLException {
        JdbcConnection databaseConnection = databaseConnection();
        try {
            databaseConnection.setAutoCommit(false);
            for (int i3 = 0; i3 < i; i3++) {
                databaseConnection.executeWithoutCommitting(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", tableName(), databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i3 + i2 + 1), Integer.valueOf(i3 + i2))});
            }
            databaseConnection.commit();
            if (databaseConnection != null) {
                databaseConnection.close();
            }
        } catch (Throwable th) {
            if (databaseConnection != null) {
                try {
                    databaseConnection.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private void insertRecordsWithRandomSleep(int i, int i2, int i3, Runnable runnable) throws SQLException {
        try {
            JdbcConnection databaseConnection = databaseConnection();
            try {
                databaseConnection.setAutoCommit(true);
                for (int i4 = 0; i4 < i; i4++) {
                    databaseConnection.execute(new String[]{String.format("INSERT INTO %s (%s, aa) VALUES (%s, %s)", tableName(), databaseConnection.quotedColumnIdString(pkFieldName()), Integer.valueOf(i4 + i2 + 1), Integer.valueOf(i4 + i2))});
                    runnable.run();
                    Thread.sleep(ThreadLocalRandom.current().nextInt(1, i3));
                }
                if (databaseConnection != null) {
                    databaseConnection.close();
                }
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void insertRecordsWithRandomSleep(int i, int i2, int i3) throws SQLException {
        insertRecordsWithRandomSleep(i, i2, i3, () -> {
        });
    }
}
