/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import java.io.File;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceBuilder;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.TestTable;
import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.table.data.RowData;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.rules.TemporaryFolder;
import org.locationtech.jts.util.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;

public class SpecificStartingOffsetITCase {
    private static final Logger LOG = LoggerFactory.getLogger(SpecificStartingOffsetITCase.class);
    @RegisterExtension
    static MiniClusterExtension miniCluster = new MiniClusterExtension();
    private final MySqlContainer mysql = (MySqlContainer)new MySqlContainer().withConfigurationOverride(SpecificStartingOffsetITCase.buildMySqlConfigWithTimezone(SpecificStartingOffsetITCase.getResourceFolder(), SpecificStartingOffsetITCase.getSystemTimeZone())).withSetupSQL("docker/setup.sql").withDatabaseName("flink-test").withUsername("flinkuser").withPassword("flinkpw").withLogConsumer((Consumer)new Slf4jLogConsumer(LOG));
    private final UniqueDatabase customDatabase = new UniqueDatabase(this.mysql, "customer", "mysqluser", "mysqlpw");
    private final TestTable customers = new TestTable(this.customDatabase, "customers", TestTableSchemas.CUSTOMERS);
    private MySqlConnection connection;

    @BeforeEach
    void prepare() throws Exception {
        this.mysql.start();
        this.connection = this.getConnection();
        this.customDatabase.createAndInitialize();
        this.flushLogs();
    }

    @AfterEach
    void tearDown() throws Exception {
        this.customDatabase.dropDatabase();
        this.connection.close();
        this.mysql.stop();
    }

    @Test
    void testStartingFromEarliestOffset() throws Exception {
        this.purgeBinaryLogs();
        this.executeStatements(String.format("INSERT INTO %s VALUES (15213, 'Alice', 'Rome', '123456987');", this.customers.getTableId()));
        this.executeStatements(String.format("INSERT INTO %s VALUES (15513, 'Bob', 'Milan', '123456987');", this.customers.getTableId()));
        this.executeStatements(String.format("INSERT INTO %s VALUES (18213, 'Charlie', 'Paris', '123456987');", this.customers.getTableId()));
        StreamExecutionEnvironment env = this.getExecutionEnvironment();
        MySqlSource source = this.getSourceBuilder().startupOptions(StartupOptions.earliest()).build();
        DataStreamSource stream = env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "earliest-offset-test");
        CollectResultIterator iterator = this.addCollector(env, (DataStream)stream);
        StreamExecutionEnvironment restoredEnv = this.getExecutionEnvironment();
        this.duplicateTransformations(env, restoredEnv);
        JobClient jobClient = env.executeAsync();
        iterator.setJobClient(jobClient);
        List<String> rows = this.fetchRowData((Iterator<RowData>)iterator, 3, this.customers::stringify);
        Assertions.assertThat(rows).containsExactly((Object[])new String[]{"+I[15213, Alice, Rome, 123456987]", "+I[15513, Bob, Milan, 123456987]", "+I[18213, Charlie, Paris, 123456987]"});
        Path savepointDir = Files.createTempDirectory("earliest-offset-test", new FileAttribute[0]);
        String savepointPath = (String)jobClient.stopWithSavepoint(false, savepointDir.toAbsolutePath().toString(), SavepointFormatType.DEFAULT).get();
        this.executeStatements(String.format("UPDATE %s SET name = 'Alicia' WHERE id = 15213", this.customers.getTableId()));
        this.setupSavepoint(restoredEnv, savepointPath);
        JobClient restoredJobClient = restoredEnv.executeAsync();
        iterator.setJobClient(restoredJobClient);
        List<String> rowsAfterRestored = this.fetchRowData((Iterator<RowData>)iterator, 2, this.customers::stringify);
        Assertions.assertThat(rowsAfterRestored).containsExactly((Object[])new String[]{"-U[15213, Alice, Rome, 123456987]", "+U[15213, Alicia, Rome, 123456987]"});
        restoredJobClient.cancel().get();
    }

    @Test
    void testStartingFromSpecificOffset() throws Exception {
        this.purgeBinaryLogs();
        BinlogOffset startingOffset = DebeziumUtils.currentBinlogOffset((JdbcConnection)this.connection);
        this.executeStatements(String.format("INSERT INTO %s VALUES (15213, 'Alice', 'Rome', '123456987');", this.customers.getTableId()));
        this.executeStatements(String.format("INSERT INTO %s VALUES (15513, 'Bob', 'Milan', '123456987');", this.customers.getTableId()));
        this.executeStatements(String.format("INSERT INTO %s VALUES (18213, 'Charlie', 'Paris', '123456987');", this.customers.getTableId()));
        StreamExecutionEnvironment env = this.getExecutionEnvironment();
        MySqlSource source = this.getSourceBuilder().startupOptions(StartupOptions.specificOffset((String)startingOffset.getFilename(), (long)startingOffset.getPosition())).build();
        DataStreamSource stream = env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "specific-offset-test");
        CollectResultIterator iterator = this.addCollector(env, (DataStream)stream);
        StreamExecutionEnvironment restoredEnv = this.getExecutionEnvironment();
        this.duplicateTransformations(env, restoredEnv);
        JobClient jobClient = env.executeAsync();
        iterator.setJobClient(jobClient);
        List<String> rows = this.fetchRowData((Iterator<RowData>)iterator, 3, this.customers::stringify);
        Assertions.assertThat(rows).containsExactly((Object[])new String[]{"+I[15213, Alice, Rome, 123456987]", "+I[15513, Bob, Milan, 123456987]", "+I[18213, Charlie, Paris, 123456987]"});
        Path savepointDir = Files.createTempDirectory("specific-offset-test", new FileAttribute[0]);
        String savepointPath = (String)jobClient.stopWithSavepoint(false, savepointDir.toAbsolutePath().toString(), SavepointFormatType.DEFAULT).get();
        this.executeStatements(String.format("UPDATE %s SET name = 'Alicia' WHERE id = 15213", this.customers.getTableId()));
        this.setupSavepoint(restoredEnv, savepointPath);
        JobClient restoredJobClient = restoredEnv.executeAsync("snapshotSplitTest");
        iterator.setJobClient(restoredJobClient);
        List<String> rowsAfterRestored = this.fetchRowData((Iterator<RowData>)iterator, 2, this.customers::stringify);
        Assertions.assertThat(rowsAfterRestored).containsExactly((Object[])new String[]{"-U[15213, Alice, Rome, 123456987]", "+U[15213, Alicia, Rome, 123456987]"});
        restoredJobClient.cancel().get();
    }

    @Test
    void testBinlogSplitFromTimestampOffset() throws Exception {
        this.purgeBinaryLogs();
        Assert.equals((Object)BinlogOffset.ofBinlogFilePosition((String)"mysql-bin.000004", (long)0L), (Object)DebeziumUtils.findBinlogOffset((long)System.currentTimeMillis(), (MySqlConnection)this.connection));
        this.executeStatements(String.format("INSERT INTO %s VALUES (15213, 'Alice', 'Rome', '123456987');", this.customers.getTableId()));
        Thread.sleep(1000L);
        long t1 = System.currentTimeMillis();
        this.flushLogs();
        this.executeStatements(String.format("INSERT INTO %s VALUES (15513, 'Bob', 'Milan', '123456987');", this.customers.getTableId()));
        Thread.sleep(1000L);
        long t2 = System.currentTimeMillis();
        this.flushLogs();
        this.executeStatements(String.format("INSERT INTO %s VALUES (18213, 'Charlie', 'Paris', '123456987');", this.customers.getTableId()));
        Thread.sleep(1000L);
        long t3 = System.currentTimeMillis();
        this.flushLogs();
        this.executeStatements(String.format("INSERT INTO %s VALUES (19613, 'Tom', 'NewYork', '123456987');", this.customers.getTableId()));
        Thread.sleep(1000L);
        long t4 = System.currentTimeMillis();
        this.flushLogs();
        this.executeStatements(String.format("INSERT INTO %s VALUES (20913, 'Cat', 'Washington', '123456987');", this.customers.getTableId()));
        Thread.sleep(1000L);
        long t5 = System.currentTimeMillis();
        this.flushLogs();
        Assert.equals((Object)BinlogOffset.ofBinlogFilePosition((String)"mysql-bin.000005", (long)0L), (Object)DebeziumUtils.findBinlogOffset((long)t1, (MySqlConnection)this.connection));
        Assert.equals((Object)BinlogOffset.ofBinlogFilePosition((String)"mysql-bin.000006", (long)0L), (Object)DebeziumUtils.findBinlogOffset((long)t2, (MySqlConnection)this.connection));
        Assert.equals((Object)BinlogOffset.ofBinlogFilePosition((String)"mysql-bin.000007", (long)0L), (Object)DebeziumUtils.findBinlogOffset((long)t3, (MySqlConnection)this.connection));
        Assert.equals((Object)BinlogOffset.ofBinlogFilePosition((String)"mysql-bin.000008", (long)0L), (Object)DebeziumUtils.findBinlogOffset((long)t4, (MySqlConnection)this.connection));
        Assert.equals((Object)BinlogOffset.ofBinlogFilePosition((String)"mysql-bin.000009", (long)0L), (Object)DebeziumUtils.findBinlogOffset((long)t5, (MySqlConnection)this.connection));
        this.purgeBinaryLogs();
        Assert.equals((Object)BinlogOffset.ofBinlogFilePosition((String)"mysql-bin.000009", (long)0L), (Object)DebeziumUtils.findBinlogOffset((long)t3, (MySqlConnection)this.connection));
    }

    @Test
    void testStartingFromTimestampOffset() throws Exception {
        this.purgeBinaryLogs();
        this.executeStatements(String.format("INSERT INTO %s VALUES (15213, 'Alice', 'Rome', '123456987');", this.customers.getTableId()));
        this.executeStatements(String.format("INSERT INTO %s VALUES (15513, 'Bob', 'Milan', '123456987');", this.customers.getTableId()));
        this.executeStatements(String.format("INSERT INTO %s VALUES (18213, 'Charlie', 'Paris', '123456987');", this.customers.getTableId()));
        this.flushLogs();
        Thread.sleep(1000L);
        StartupOptions startupOptions = StartupOptions.timestamp((long)System.currentTimeMillis());
        this.executeStatements(String.format("INSERT INTO %s VALUES (19613, 'Tom', 'NewYork', '123456987');", this.customers.getTableId()));
        this.executeStatements(String.format("INSERT INTO %s VALUES (20913, 'Cat', 'Washington', '123456987');", this.customers.getTableId()));
        this.executeStatements(String.format("INSERT INTO %s VALUES (23313, 'Mouse', 'Seattle', '123456987');", this.customers.getTableId()));
        StreamExecutionEnvironment env = this.getExecutionEnvironment();
        MySqlSource source = this.getSourceBuilder().startupOptions(startupOptions).build();
        DataStreamSource stream = env.fromSource((Source)source, WatermarkStrategy.noWatermarks(), "timestamp-offset-test");
        CollectResultIterator iterator = this.addCollector(env, (DataStream)stream);
        StreamExecutionEnvironment restoredEnv = this.getExecutionEnvironment();
        this.duplicateTransformations(env, restoredEnv);
        JobClient jobClient = env.executeAsync();
        iterator.setJobClient(jobClient);
        List<String> rows = this.fetchRowData((Iterator<RowData>)iterator, 3, this.customers::stringify);
        Assertions.assertThat(rows).containsExactly((Object[])new String[]{"+I[19613, Tom, NewYork, 123456987]", "+I[20913, Cat, Washington, 123456987]", "+I[23313, Mouse, Seattle, 123456987]"});
        Path savepointDir = Files.createTempDirectory("timestamp-offset-test", new FileAttribute[0]);
        String savepointPath = (String)jobClient.stopWithSavepoint(false, savepointDir.toAbsolutePath().toString(), SavepointFormatType.DEFAULT).get();
        this.executeStatements(String.format("UPDATE %s SET name = 'George' WHERE id = 18213", this.customers.getTableId()));
        this.setupSavepoint(restoredEnv, savepointPath);
        JobClient restoredJobClient = restoredEnv.executeAsync("snapshotSplitTest");
        iterator.setJobClient(restoredJobClient);
        List<String> rowsAfterRestored = this.fetchRowData((Iterator<RowData>)iterator, 2, this.customers::stringify);
        Assertions.assertThat(rowsAfterRestored).containsExactly((Object[])new String[]{"-U[18213, Charlie, Paris, 123456987]", "+U[18213, George, Paris, 123456987]"});
        restoredJobClient.cancel().get();
    }

    private MySqlSourceBuilder<RowData> getSourceBuilder() {
        return MySqlSource.builder().hostname(this.mysql.getHost()).port(this.mysql.getDatabasePort()).username(this.customDatabase.getUsername()).password(this.customDatabase.getPassword()).databaseList(new String[]{this.customDatabase.getDatabaseName()}).tableList(new String[]{this.customers.getTableId()}).deserializer((DebeziumDeserializationSchema)this.customers.getDeserializer());
    }

    private MySqlConnection getConnection() {
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("database.hostname", this.mysql.getHost());
        properties.put("database.port", String.valueOf(this.mysql.getDatabasePort()));
        properties.put("database.user", this.customDatabase.getUsername());
        properties.put("database.password", this.customDatabase.getPassword());
        Configuration configuration = Configuration.from(properties);
        return DebeziumUtils.createMySqlConnection((Configuration)configuration, (Properties)new Properties());
    }

    private void executeStatements(String ... statements) throws Exception {
        this.connection.execute(statements);
        this.connection.commit();
    }

    private void flushLogs() throws Exception {
        this.executeStatements("FLUSH LOGS;");
    }

    private void purgeBinaryLogs() throws Exception {
        BinlogOffset currentOffset = DebeziumUtils.currentBinlogOffset((JdbcConnection)this.connection);
        String currentBinlogFilename = currentOffset.getFilename();
        this.executeStatements(String.format("PURGE BINARY LOGS TO '%s'", currentBinlogFilename));
    }

    private <T> CollectResultIterator<T> addCollector(StreamExecutionEnvironment env, DataStream<T> stream) {
        TypeSerializer serializer = stream.getTransformation().getOutputType().createSerializer(env.getConfig());
        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
        CollectSinkOperatorFactory factory = new CollectSinkOperatorFactory(serializer, accumulatorName);
        CollectSinkOperator operator = (CollectSinkOperator)factory.getOperator();
        CollectResultIterator iterator = new CollectResultIterator(operator.getOperatorIdFuture(), serializer, accumulatorName, env.getCheckpointConfig());
        CollectStreamSink sink = new CollectStreamSink(stream, factory);
        sink.name("Data stream collect sink");
        env.addOperator(sink.getTransformation());
        return iterator;
    }

    private List<String> fetchRowData(Iterator<RowData> iter, int size, Function<RowData, String> stringifier) {
        ArrayList<RowData> rows = new ArrayList<RowData>(size);
        while (size > 0 && iter.hasNext()) {
            RowData row = iter.next();
            rows.add(row);
            --size;
        }
        return rows.stream().map(stringifier).collect(Collectors.toList());
    }

    private static String buildMySqlConfigWithTimezone(File resourceDirectory, String timezone) {
        try {
            TemporaryFolder tempFolder = new TemporaryFolder(resourceDirectory);
            tempFolder.create();
            File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID()));
            Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf"), new FileAttribute[0]);
            String mysqldConf = "[mysqld]\nbinlog_format = row\nlog_bin = mysql-bin\nserver-id = 223344\nbinlog_row_image = FULL\ngtid_mode = on\nenforce_gtid_consistency = on\n";
            String timezoneConf = "default-time_zone = '" + timezone + "'\n";
            Files.write(cnf, Collections.singleton(mysqldConf + timezoneConf), StandardCharsets.UTF_8, StandardOpenOption.APPEND);
            return Paths.get(resourceDirectory.getAbsolutePath(), new String[0]).relativize(cnf).toString();
        }
        catch (Exception e) {
            throw new RuntimeException("Failed to create my.cnf file.", e);
        }
    }

    private static File getResourceFolder() {
        try {
            return Paths.get(Objects.requireNonNull(SpecificStartingOffsetITCase.class.getClassLoader().getResource(".")).toURI()).toFile();
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Get Resource File Directory fail");
        }
    }

    private static String getSystemTimeZone() {
        return ZoneId.systemDefault().toString();
    }

    private void setupSavepoint(StreamExecutionEnvironment env, String savepointPath) throws Exception {
        ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
        Class<?> clazz = classLoader.loadClass("org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
        Field field = clazz.getDeclaredField("configuration");
        field.setAccessible(true);
        org.apache.flink.configuration.Configuration configuration = (org.apache.flink.configuration.Configuration)field.get(env);
        configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
    }

    private void duplicateTransformations(StreamExecutionEnvironment source, StreamExecutionEnvironment target) {
        source.getTransformations().forEach(arg_0 -> ((StreamExecutionEnvironment)target).addOperator(arg_0));
    }

    private StreamExecutionEnvironment getExecutionEnvironment() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(100L);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        return env;
    }
}

