/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.format.dms;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.AbstractJsonRecordParser;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.utils.Pair;

public class DMSRecordParser
extends AbstractJsonRecordParser {
    private static final String FIELD_DATA = "data";
    private static final String FIELD_METADATA = "metadata";
    private static final String FIELD_TYPE = "record-type";
    private static final String FIELD_OP = "operation";
    private static final String FIELD_DATABASE = "schema-name";
    private static final String FIELD_TABLE = "table-name";
    private static final String OP_LOAD = "load";
    private static final String OP_INSERT = "insert";
    private static final String OP_UPDATE = "update";
    private static final String OP_DELETE = "delete";
    private static final String BEFORE_PREFIX = "BI_";

    public DMSRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
        super(typeMapping, computedColumns);
    }

    @Override
    @Nullable
    protected String getTableName() {
        JsonNode metaNode = this.getAndCheck(FIELD_METADATA);
        return metaNode.get(FIELD_TABLE).asText();
    }

    @Override
    protected List<RichCdcMultiplexRecord> extractRecords() {
        if (this.isDDL()) {
            return Collections.emptyList();
        }
        JsonNode dataNode = this.getAndCheck(this.dataField());
        String operation = this.getAndCheck(FIELD_METADATA).get(FIELD_OP).asText();
        ArrayList<RichCdcMultiplexRecord> records = new ArrayList<RichCdcMultiplexRecord>();
        switch (operation) {
            case "load": 
            case "insert": {
                this.processRecord(dataNode, RowKind.INSERT, records);
                break;
            }
            case "update": {
                Pair<JsonNode, JsonNode> dataAndBeforeNodes = this.splitBeforeAndData(dataNode);
                this.processRecord((JsonNode)dataAndBeforeNodes.getRight(), RowKind.DELETE, records);
                this.processRecord((JsonNode)dataAndBeforeNodes.getLeft(), RowKind.INSERT, records);
                break;
            }
            case "delete": {
                this.processRecord(dataNode, RowKind.DELETE, records);
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown record operation: " + operation);
            }
        }
        return records;
    }

    @Override
    @Nullable
    protected String getDatabaseName() {
        JsonNode metaNode = this.getAndCheck(FIELD_METADATA);
        return metaNode.get(FIELD_DATABASE).asText();
    }

    @Override
    protected String primaryField() {
        return null;
    }

    @Override
    protected String dataField() {
        return FIELD_DATA;
    }

    @Override
    protected String format() {
        return "aws-dms-json";
    }

    @Override
    protected boolean isDDL() {
        String recordType = this.getAndCheck(FIELD_METADATA).get(FIELD_TYPE).asText();
        return !FIELD_DATA.equals(recordType);
    }

    private Pair<JsonNode, JsonNode> splitBeforeAndData(JsonNode dataNode) {
        JsonNode newDataNode = dataNode.deepCopy();
        JsonNode beforeDataNode = dataNode.deepCopy();
        Iterator newDataFields = newDataNode.fields();
        while (newDataFields.hasNext()) {
            Map.Entry next = (Map.Entry)newDataFields.next();
            if (!((String)next.getKey()).startsWith(BEFORE_PREFIX)) continue;
            newDataFields.remove();
        }
        Iterator beforeDataFields = beforeDataNode.fields();
        while (beforeDataFields.hasNext()) {
            Map.Entry next = (Map.Entry)beforeDataFields.next();
            if (!((String)next.getKey()).startsWith(BEFORE_PREFIX)) continue;
            String key = ((String)next.getKey()).replaceFirst(BEFORE_PREFIX, "");
            ((ObjectNode)beforeDataNode).set(key, (JsonNode)next.getValue());
            beforeDataFields.remove();
        }
        return Pair.of((Object)newDataNode, (Object)beforeDataNode);
    }
}

