package com.starrocks.connector.flink.cdc.json;

import com.starrocks.connector.flink.catalog.StarRocksCatalog;
import com.starrocks.connector.flink.catalog.StarRocksColumn;
import com.starrocks.connector.flink.catalog.TypeUtils;
import com.starrocks.connector.flink.cdc.StarRocksOptions;
import com.starrocks.streamload.shade.com.fasterxml.jackson.core.JsonProcessingException;
import com.starrocks.streamload.shade.com.fasterxml.jackson.core.type.TypeReference;
import com.starrocks.streamload.shade.com.fasterxml.jackson.databind.DeserializationFeature;
import com.starrocks.streamload.shade.com.fasterxml.jackson.databind.JsonNode;
import com.starrocks.streamload.shade.com.fasterxml.jackson.databind.ObjectMapper;
import com.starrocks.streamload.shade.com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.starrocks.streamload.shade.org.apache.http.HttpStatus;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/flink/cdc/json/DebeziumJsonSerializer.class */
public class DebeziumJsonSerializer implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(DebeziumJsonSerializer.class);
    private static final String OP_READ = "r";
    private static final String OP_CREATE = "c";
    private static final String OP_UPDATE = "u";
    private static final String OP_DELETE = "d";
    public static final String INVALID_RESULT = "invalid result";
    private static final String STARROCKS_DELETE_SIGN = "__op";
    public static final String EXECUTE_DDL = "ALTER TABLE %s %s COLUMN %s %s";
    private static final String addDropDDLRegex = "ALTER\\s+TABLE\\s+[^\\s]+\\s+(ADD|DROP)\\s+(COLUMN\\s+)?([^\\s]+)(\\s+([^\\s]+))?.*";
    private final Pattern addDropDDLPattern;
    private StarRocksOptions starRocksOptions;
    private ObjectMapper objectMapper = new ObjectMapper();
    private String database;
    private String table;
    private String sourceTableName;
    private StarRocksCatalog starRocksCatalog;
    private Boolean isFastSchemaEvolution;

    /* loaded from: input_file:com/starrocks/connector/flink/cdc/json/DebeziumJsonSerializer$Builder.class */
    public static class Builder {
        private StarRocksOptions starRocksOptions;
        private Pattern addDropDDLPattern;
        private String sourceTableName;

        public Builder setStarRocksOptions(StarRocksOptions starRocksOptions) {
            this.starRocksOptions = starRocksOptions;
            return this;
        }

        public Builder setPattern(Pattern pattern) {
            this.addDropDDLPattern = pattern;
            return this;
        }

        public Builder setSourceTableName(String str) {
            this.sourceTableName = str;
            return this;
        }

        public DebeziumJsonSerializer build() {
            return new DebeziumJsonSerializer(this.starRocksOptions, this.addDropDDLPattern, this.sourceTableName);
        }
    }

    public DebeziumJsonSerializer(StarRocksOptions starRocksOptions, Pattern pattern, String str) {
        this.starRocksOptions = starRocksOptions;
        this.addDropDDLPattern = pattern == null ? Pattern.compile(addDropDDLRegex, 2) : pattern;
        String[] split = starRocksOptions.getTableIdentifier().split("\\.");
        this.database = split[0];
        this.table = split[1];
        this.sourceTableName = str;
        this.objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
        this.objectMapper.setNodeFactory(JsonNodeFactory.withExactBigDecimals(true));
        this.starRocksCatalog = new StarRocksCatalog(starRocksOptions.getOpts().getDbURL(), starRocksOptions.getOpts().getUsername().get(), starRocksOptions.getOpts().getPassword().get());
        this.isFastSchemaEvolution = starRocksOptions.getFastSchemaEvolution();
        this.starRocksCatalog.open();
    }

    public String process(String str) throws IOException {
        Map<String, String> extractBeforeRow;
        LOG.debug("received debezium json data {} :", str);
        JsonNode jsonNode = (JsonNode) this.objectMapper.readValue(str, JsonNode.class);
        String extractJsonNode = extractJsonNode(jsonNode, "op");
        if (Objects.isNull(extractJsonNode)) {
            if (!this.isFastSchemaEvolution.booleanValue()) {
                return INVALID_RESULT;
            }
            schemaChange(jsonNode);
            return INVALID_RESULT;
        }
        boolean z = -1;
        switch (extractJsonNode.hashCode()) {
            case 99:
                if (extractJsonNode.equals(OP_CREATE)) {
                    z = true;
                    break;
                }
                break;
            case HttpStatus.SC_CONTINUE /* 100 */:
                if (extractJsonNode.equals(OP_DELETE)) {
                    z = 3;
                    break;
                }
                break;
            case 114:
                if (extractJsonNode.equals(OP_READ)) {
                    z = false;
                    break;
                }
                break;
            case 117:
                if (extractJsonNode.equals(OP_UPDATE)) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
            case true:
            case true:
                extractBeforeRow = extractAfterRow(jsonNode);
                addDeleteSign(extractBeforeRow, false);
                break;
            case true:
                extractBeforeRow = extractBeforeRow(jsonNode);
                addDeleteSign(extractBeforeRow, true);
                break;
            default:
                LOG.error("parse record fail, unknown op {} in {}", extractJsonNode, str);
                return INVALID_RESULT;
        }
        String writeValueAsString = this.objectMapper.writeValueAsString(extractBeforeRow);
        LOG.debug("format json data {} :", writeValueAsString);
        return writeValueAsString;
    }

    @VisibleForTesting
    public boolean schemaChange(JsonNode jsonNode) {
        try {
            if (!StringUtils.isNullOrWhitespaceOnly(this.sourceTableName) && !checkTable(jsonNode)) {
                return false;
            }
            extractDDLAndExecute(jsonNode);
            return true;
        } catch (Exception e) {
            LOG.warn("schema change error :", e);
            return true;
        }
    }

    protected boolean checkTable(JsonNode jsonNode) {
        return this.sourceTableName.equals(extractDatabase(jsonNode) + "." + extractTable(jsonNode));
    }

    private void addDeleteSign(Map<String, String> map, boolean z) {
        if (z) {
            map.put("__op", "1");
        } else {
            map.put("__op", "0");
        }
    }

    protected String extractDatabase(JsonNode jsonNode) {
        return jsonNode.get("source").has("schema") ? extractJsonNode(jsonNode.get("source"), "schema") : extractJsonNode(jsonNode.get("source"), "db");
    }

    protected String extractTable(JsonNode jsonNode) {
        return extractJsonNode(jsonNode.get("source"), "table");
    }

    private String extractJsonNode(JsonNode jsonNode, String str) {
        if (jsonNode == null || jsonNode.get(str) == null) {
            return null;
        }
        return jsonNode.get(str).asText();
    }

    private Map<String, String> extractBeforeRow(JsonNode jsonNode) {
        return extractRow(jsonNode.get("before"));
    }

    private Map<String, String> extractAfterRow(JsonNode jsonNode) {
        return extractRow(jsonNode.get("after"));
    }

    private Map<String, String> extractRow(JsonNode jsonNode) {
        Map<String, String> map = (Map) this.objectMapper.convertValue(jsonNode, new TypeReference<Map<String, String>>() { // from class: com.starrocks.connector.flink.cdc.json.DebeziumJsonSerializer.1
        });
        return map != null ? map : new HashMap();
    }

    private void extractDDLAndExecute(JsonNode jsonNode) throws JsonProcessingException {
        String extractJsonNode = extractJsonNode(jsonNode, "historyRecord");
        if (Objects.isNull(extractJsonNode)) {
            return;
        }
        String extractJsonNode2 = extractJsonNode(this.objectMapper.readTree(extractJsonNode), "ddl");
        LOG.debug("received debezium ddl :{}", extractJsonNode2);
        if (Objects.isNull(extractJsonNode2)) {
            return;
        }
        Matcher matcher = this.addDropDDLPattern.matcher(extractJsonNode2);
        if (matcher.find()) {
            String group = matcher.group(1);
            String group2 = matcher.group(3);
            if (group.equalsIgnoreCase("drop")) {
                execDropDDL(group2);
            } else {
                execAddDDL(group2, handleType(matcher.group(5)));
            }
        }
    }

    private void execAddDDL(String str, String str2) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new StarRocksColumn.Builder().setColumnName(str).setDataType(str2).build());
        this.starRocksCatalog.alterAddColumns(this.database, this.table, arrayList, 30L);
    }

    private void execDropDDL(String str) {
        this.starRocksCatalog.alterDropColumns(this.database, this.table, Arrays.asList(str), 30L);
    }

    public static Builder builder() {
        return new Builder();
    }

    private String handleType(String str) {
        if (str == null || "".equals(str)) {
            return "";
        }
        Matcher matcher = Pattern.compile("varchar\\(([1-9][0-9]*)\\)", 2).matcher(str);
        return matcher.find() ? String.format("varchar(%d)", Integer.valueOf(Math.min(Integer.parseInt(matcher.group(1)) * 3, TypeUtils.STRING_SIZE))) : str;
    }
}
