/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mysql.source.utils;

import io.debezium.data.Envelope;
import io.debezium.document.Document;
import io.debezium.document.DocumentReader;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.util.SchemaNameAdjuster;
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.flink.cdc.common.utils.StringUtils;
import org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.table.types.logical.RowType;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RecordUtils {
    private static final Logger LOG = LoggerFactory.getLogger(RecordUtils.class);
    public static final String SCHEMA_CHANGE_EVENT_KEY_NAME = "io.debezium.connector.mysql.SchemaChangeKey";
    public static final String SCHEMA_HEARTBEAT_EVENT_KEY_NAME = "io.debezium.connector.common.Heartbeat";
    private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader();
    private static final Pattern OSC_TABLE_ID_PATTERN = Pattern.compile("^_(.*)_(gho|new)$");

    private RecordUtils() {
    }

    public static Object[] rowToArray(ResultSet rs, int size) throws SQLException {
        Object[] row = new Object[size];
        for (int i = 0; i < size; ++i) {
            row[i] = rs.getObject(i + 1);
        }
        return row;
    }

    public static Struct getStructContainsChunkKey(SourceRecord record) {
        Envelope.Operation op = Envelope.operationFor((SourceRecord)record);
        Struct value = (Struct)record.value();
        if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
            return value.getStruct("after");
        }
        return value.getStruct("before");
    }

    public static void upsertBinlog(Map<Struct, List<SourceRecord>> snapshotRecords, SourceRecord binlogRecord, RowType splitBoundaryType, SchemaNameAdjuster nameAdjuster, Object[] splitStart, Object[] splitEnd) {
        Struct chunkKeyStruct;
        Struct value;
        if (RecordUtils.isDataChangeRecord(binlogRecord) && (value = (Struct)binlogRecord.value()) != null && RecordUtils.splitKeyRangeContains(RecordUtils.getSplitKey(splitBoundaryType, nameAdjuster, chunkKeyStruct = RecordUtils.getStructContainsChunkKey(binlogRecord)), splitStart, splitEnd)) {
            boolean hasPrimaryKey = binlogRecord.key() != null;
            Envelope.Operation operation = Envelope.Operation.forCode((String)value.getString("op"));
            switch (operation) {
                case CREATE: {
                    RecordUtils.upsertBinlog(snapshotRecords, binlogRecord, hasPrimaryKey ? (Struct)binlogRecord.key() : RecordUtils.createReadOpValue(binlogRecord, "after"), false);
                    break;
                }
                case UPDATE: {
                    Struct structFromAfter = RecordUtils.createReadOpValue(binlogRecord, "after");
                    if (!hasPrimaryKey) {
                        RecordUtils.upsertBinlog(snapshotRecords, binlogRecord, RecordUtils.createReadOpValue(binlogRecord, "before"), true);
                        if (!RecordUtils.splitKeyRangeContains(RecordUtils.getSplitKey(splitBoundaryType, nameAdjuster, structFromAfter), splitStart, splitEnd)) {
                            LOG.warn("The updated chunk key is out of the split range. Cannot provide exactly-once semantics.");
                        }
                    }
                    RecordUtils.upsertBinlog(snapshotRecords, binlogRecord, hasPrimaryKey ? (Struct)binlogRecord.key() : structFromAfter, false);
                    break;
                }
                case DELETE: {
                    RecordUtils.upsertBinlog(snapshotRecords, binlogRecord, hasPrimaryKey ? (Struct)binlogRecord.key() : RecordUtils.createReadOpValue(binlogRecord, "before"), true);
                    break;
                }
                case READ: {
                    throw new IllegalStateException(String.format("Binlog record shouldn't use READ operation, the the record is %s.", binlogRecord));
                }
            }
        }
    }

    private static void upsertBinlog(Map<Struct, List<SourceRecord>> snapshotRecords, SourceRecord binlogRecord, Struct keyStruct, boolean isDelete) {
        boolean hasPrimaryKey = binlogRecord.key() != null;
        List<SourceRecord> records = snapshotRecords.get(keyStruct);
        if (isDelete) {
            if (records == null || records.isEmpty()) {
                LOG.error("Deleting a record which is not in its split for tables without primary keys. This may happen when the chunk key column is updated in another snapshot split.");
            } else if (hasPrimaryKey) {
                snapshotRecords.remove(keyStruct);
            } else {
                snapshotRecords.get(keyStruct).remove(0);
            }
        } else {
            SourceRecord record = new SourceRecord(binlogRecord.sourcePartition(), binlogRecord.sourceOffset(), binlogRecord.topic(), binlogRecord.kafkaPartition(), binlogRecord.keySchema(), binlogRecord.key(), binlogRecord.valueSchema(), (Object)RecordUtils.createReadOpValue(binlogRecord, "after"));
            if (hasPrimaryKey) {
                snapshotRecords.put(keyStruct, Collections.singletonList(record));
            } else {
                if (records == null) {
                    snapshotRecords.put(keyStruct, new LinkedList());
                    records = snapshotRecords.get(keyStruct);
                }
                records.add(record);
            }
        }
    }

    private static Struct createReadOpValue(SourceRecord binlogRecord, String beforeOrAfter) {
        Struct value = (Struct)binlogRecord.value();
        Envelope envelope = Envelope.fromSchema((Schema)binlogRecord.valueSchema());
        Struct source = value.getStruct("source");
        Struct targetStruct = value.getStruct(beforeOrAfter);
        Instant fetchTs = Instant.ofEpochMilli((Long)source.get("ts_ms"));
        return envelope.read((Object)targetStruct, source, fetchTs);
    }

    public static List<SourceRecord> formatMessageTimestamp(Collection<SourceRecord> snapshotRecords) {
        return snapshotRecords.stream().map(record -> {
            Envelope envelope = Envelope.fromSchema((Schema)record.valueSchema());
            Struct value = (Struct)record.value();
            Struct updateAfter = value.getStruct("after");
            Struct source = value.getStruct("source");
            source.put("ts_ms", (Object)0L);
            Instant fetchTs = Instant.ofEpochMilli(value.getInt64("ts_ms"));
            SourceRecord sourceRecord = new SourceRecord(record.sourcePartition(), record.sourceOffset(), record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), (Object)envelope.read((Object)updateAfter, source, fetchTs));
            return sourceRecord;
        }).collect(Collectors.toList());
    }

    public static boolean isWatermarkEvent(SourceRecord record) {
        Optional<SignalEventDispatcher.WatermarkKind> watermarkKind = RecordUtils.getWatermarkKind(record);
        return watermarkKind.isPresent();
    }

    public static boolean isLowWatermarkEvent(SourceRecord record) {
        Optional<SignalEventDispatcher.WatermarkKind> watermarkKind = RecordUtils.getWatermarkKind(record);
        return watermarkKind.isPresent() && watermarkKind.get() == SignalEventDispatcher.WatermarkKind.LOW;
    }

    public static boolean isHighWatermarkEvent(SourceRecord record) {
        Optional<SignalEventDispatcher.WatermarkKind> watermarkKind = RecordUtils.getWatermarkKind(record);
        return watermarkKind.isPresent() && watermarkKind.get() == SignalEventDispatcher.WatermarkKind.HIGH;
    }

    public static boolean isEndWatermarkEvent(SourceRecord record) {
        Optional<SignalEventDispatcher.WatermarkKind> watermarkKind = RecordUtils.getWatermarkKind(record);
        return watermarkKind.isPresent() && watermarkKind.get() == SignalEventDispatcher.WatermarkKind.BINLOG_END;
    }

    public static BinlogOffset getWatermark(SourceRecord watermarkEvent) {
        return RecordUtils.getBinlogPosition(watermarkEvent.sourceOffset());
    }

    public static Long getMessageTimestamp(SourceRecord record) {
        Schema schema = record.valueSchema();
        Struct value = (Struct)record.value();
        if (schema.field("source") == null) {
            return null;
        }
        Struct source = value.getStruct("source");
        if (source.schema().field("ts_ms") == null) {
            return null;
        }
        return source.getInt64("ts_ms");
    }

    public static Long getFetchTimestamp(SourceRecord record) {
        Schema schema = record.valueSchema();
        Struct value = (Struct)record.value();
        if (schema.field("ts_ms") == null) {
            return null;
        }
        return value.getInt64("ts_ms");
    }

    public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) {
        Schema keySchema = sourceRecord.keySchema();
        return keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name());
    }

    public static boolean isHeartbeatEvent(SourceRecord record) {
        Schema valueSchema = record.valueSchema();
        return valueSchema != null && SCHEMA_HEARTBEAT_EVENT_KEY_NAME.equalsIgnoreCase(valueSchema.name());
    }

    public static FinishedSnapshotSplitInfo getSnapshotSplitInfo(MySqlSnapshotSplit split, SourceRecord highWatermark) {
        Struct value = (Struct)highWatermark.value();
        String splitId = value.getString("split_id");
        return new FinishedSnapshotSplitInfo(split.getTableId(), splitId, split.getSplitStart(), split.getSplitEnd(), RecordUtils.getBinlogPosition(highWatermark.sourceOffset()));
    }

    public static BinlogOffset getStartingOffsetOfBinlogSplit(List<FinishedSnapshotSplitInfo> finishedSnapshotSplits) {
        BinlogOffset startOffset = finishedSnapshotSplits.isEmpty() ? BinlogOffset.ofEarliest() : finishedSnapshotSplits.get(0).getHighWatermark();
        for (FinishedSnapshotSplitInfo finishedSnapshotSplit : finishedSnapshotSplits) {
            if (!finishedSnapshotSplit.getHighWatermark().isBefore(startOffset)) continue;
            startOffset = finishedSnapshotSplit.getHighWatermark();
        }
        return startOffset;
    }

    public static boolean isDataChangeRecord(SourceRecord record) {
        Schema valueSchema = record.valueSchema();
        Struct value = (Struct)record.value();
        return valueSchema.field("op") != null && value.getString("op") != null;
    }

    public static TableId getTableId(SourceRecord dataRecord) {
        Struct value = (Struct)dataRecord.value();
        Struct source = value.getStruct("source");
        String dbName = source.getString("db");
        String tableName = source.getString("table");
        return new TableId(dbName, null, tableName);
    }

    public static SourceRecord setTableId(SourceRecord dataRecord, TableId originalTableId, TableId tableId) {
        Document historyRecordDocument;
        Struct value = (Struct)dataRecord.value();
        try {
            historyRecordDocument = RecordUtils.getHistoryRecord(dataRecord).document();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        HistoryRecord newHistoryRecord = new HistoryRecord(historyRecordDocument.set((CharSequence)"ddl", (Object)historyRecordDocument.get((CharSequence)"ddl").asString().replace(originalTableId.table(), tableId.table())));
        Struct newSource = value.getStruct("source").put("db", (Object)tableId.catalog()).put("table", (Object)tableId.table());
        return dataRecord.newRecord(dataRecord.topic(), dataRecord.kafkaPartition(), dataRecord.keySchema(), dataRecord.key(), dataRecord.valueSchema(), (Object)value.put("source", (Object)newSource).put("historyRecord", (Object)newHistoryRecord.toString()), dataRecord.timestamp(), (Iterable)dataRecord.headers());
    }

    public static boolean isTableChangeRecord(SourceRecord dataRecord) {
        Struct value = (Struct)dataRecord.value();
        Struct source = value.getStruct("source");
        String tableName = source.getString("table");
        return !StringUtils.isNullOrWhitespaceOnly((String)tableName);
    }

    public static Object[] getSplitKey(RowType splitBoundaryType, SchemaNameAdjuster nameAdjuster, Struct target) {
        String splitFieldName = nameAdjuster.adjust((String)splitBoundaryType.getFieldNames().get(0));
        return new Object[]{target.get(splitFieldName)};
    }

    public static BinlogOffset getBinlogPosition(SourceRecord dataRecord) {
        return RecordUtils.getBinlogPosition(dataRecord.sourceOffset());
    }

    public static BinlogOffset getBinlogPosition(Map<String, ?> offset) {
        HashMap<String, String> offsetStrMap = new HashMap<String, String>();
        for (Map.Entry<String, ?> entry : offset.entrySet()) {
            offsetStrMap.put(entry.getKey(), entry.getValue() == null ? null : entry.getValue().toString());
        }
        return BinlogOffset.builder().setOffsetMap(offsetStrMap).build();
    }

    public static boolean splitKeyRangeContains(Object[] key, Object[] splitKeyStart, Object[] splitKeyEnd) {
        if (splitKeyStart == null && splitKeyEnd == null) {
            return true;
        }
        if (splitKeyStart == null) {
            int[] upperBoundRes = new int[key.length];
            for (int i = 0; i < key.length; ++i) {
                upperBoundRes[i] = RecordUtils.compareObjects(key[i], splitKeyEnd[i]);
            }
            return Arrays.stream(upperBoundRes).anyMatch(value -> value < 0) && Arrays.stream(upperBoundRes).allMatch(value -> value <= 0);
        }
        if (splitKeyEnd == null) {
            int[] lowerBoundRes = new int[key.length];
            for (int i = 0; i < key.length; ++i) {
                lowerBoundRes[i] = RecordUtils.compareObjects(key[i], splitKeyStart[i]);
            }
            return Arrays.stream(lowerBoundRes).allMatch(value -> value >= 0);
        }
        int[] lowerBoundRes = new int[key.length];
        int[] upperBoundRes = new int[key.length];
        for (int i = 0; i < key.length; ++i) {
            lowerBoundRes[i] = RecordUtils.compareObjects(key[i], splitKeyStart[i]);
            upperBoundRes[i] = RecordUtils.compareObjects(key[i], splitKeyEnd[i]);
        }
        return Arrays.stream(lowerBoundRes).anyMatch(value -> value >= 0) && Arrays.stream(upperBoundRes).anyMatch(value -> value < 0) && Arrays.stream(upperBoundRes).allMatch(value -> value <= 0);
    }

    private static int compareObjects(Object o1, Object o2) {
        if (o1 instanceof Comparable && o1.getClass().equals(o2.getClass())) {
            return ((Comparable)o1).compareTo(o2);
        }
        if (RecordUtils.isNumericObject(o1) && RecordUtils.isNumericObject(o2)) {
            return RecordUtils.toBigDecimal(o1).compareTo(RecordUtils.toBigDecimal(o2));
        }
        return o1.toString().compareTo(o2.toString());
    }

    private static boolean isNumericObject(Object obj) {
        return obj instanceof Byte || obj instanceof Short || obj instanceof Integer || obj instanceof Long || obj instanceof Float || obj instanceof Double || obj instanceof BigInteger || obj instanceof BigDecimal;
    }

    private static BigDecimal toBigDecimal(Object numericObj) {
        return new BigDecimal(numericObj.toString());
    }

    public static HistoryRecord getHistoryRecord(SourceRecord schemaRecord) throws IOException {
        Struct value = (Struct)schemaRecord.value();
        String historyRecordStr = value.getString("historyRecord");
        return new HistoryRecord(DOCUMENT_READER.read(historyRecordStr));
    }

    private static Optional<SignalEventDispatcher.WatermarkKind> getWatermarkKind(SourceRecord record) {
        if (record.valueSchema() != null && "io.debezium.connector.flink.cdc.embedded.watermark.value".equals(record.valueSchema().name())) {
            Struct value = (Struct)record.value();
            return Optional.of(SignalEventDispatcher.WatermarkKind.valueOf(value.getString("watermark_kind")));
        }
        return Optional.empty();
    }

    public static boolean isOnLineSchemaChangeEvent(SourceRecord record) {
        if (!RecordUtils.isSchemaChangeEvent(record)) {
            return false;
        }
        Struct value = (Struct)record.value();
        ObjectMapper mapper = new ObjectMapper();
        try {
            String ddl = mapper.readTree(value.getString("historyRecord")).get("ddl").asText().toLowerCase();
            if (ddl.startsWith("alter")) {
                String tableName = value.getStruct("source").getString("table");
                return OSC_TABLE_ID_PATTERN.matcher(tableName).matches();
            }
            return false;
        }
        catch (JsonProcessingException e) {
            return false;
        }
    }

    public static TableId peelTableId(TableId tableId) {
        Matcher matchingResult = OSC_TABLE_ID_PATTERN.matcher(tableId.table());
        if (matchingResult.matches()) {
            return new TableId(tableId.catalog(), tableId.schema(), matchingResult.group(1));
        }
        return tableId;
    }
}

