/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.format.debezium;

import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.ComputedColumn;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.AbstractRecordParser;
import org.apache.paimon.flink.action.cdc.format.debezium.DebeziumSchemaUtils;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DebeziumAvroRecordParser
extends AbstractRecordParser {
    private static final Logger LOG = LoggerFactory.getLogger(DebeziumAvroRecordParser.class);
    private static final Schema NULL_AVRO_SCHEMA = Schema.create((Schema.Type)Schema.Type.NULL);
    private GenericRecord keyRecord;
    private GenericRecord valueRecord;

    public DebeziumAvroRecordParser(TypeMapping typeMapping, List<ComputedColumn> computedColumns) {
        super(typeMapping, computedColumns);
    }

    @Override
    protected void setRoot(CdcSourceRecord record) {
        this.keyRecord = (GenericRecord)record.getKey();
        this.valueRecord = (GenericRecord)record.getValue();
    }

    @Override
    protected List<RichCdcMultiplexRecord> extractRecords() {
        String operation = this.getAndCheck("op").toString();
        ArrayList<RichCdcMultiplexRecord> records = new ArrayList<RichCdcMultiplexRecord>();
        switch (operation) {
            case "r": 
            case "c": {
                this.processRecord((GenericRecord)this.getAndCheck("after"), RowKind.INSERT, records);
                break;
            }
            case "u": {
                this.processRecord((GenericRecord)this.getAndCheck("before"), RowKind.DELETE, records);
                this.processRecord((GenericRecord)this.getAndCheck("after"), RowKind.INSERT, records);
                break;
            }
            case "d": {
                this.processRecord((GenericRecord)this.getAndCheck("before"), RowKind.DELETE, records);
                break;
            }
            case "t": 
            case "m": {
                LOG.info("Skip record operation: {}", (Object)operation);
                break;
            }
            default: {
                throw new UnsupportedOperationException("Unknown record operation: " + operation);
            }
        }
        return records;
    }

    private void processRecord(GenericRecord record, RowKind rowKind, List<RichCdcMultiplexRecord> records) {
        RowType.Builder rowTypeBuilder = RowType.builder();
        Map<String, String> rowData = this.extractRowData(record, rowTypeBuilder);
        records.add(this.createRecord(rowKind, rowData, rowTypeBuilder.build().getFields()));
    }

    @Override
    protected List<String> extractPrimaryKeys() {
        if (this.keyRecord == null) {
            return Collections.emptyList();
        }
        Schema keySchema = this.sanitizedSchema(this.keyRecord.getSchema());
        return keySchema.getFields().stream().map(Schema.Field::name).collect(Collectors.toList());
    }

    private Map<String, String> extractRowData(GenericRecord record, RowType.Builder rowTypeBuilder) {
        Schema payloadSchema = this.sanitizedSchema(record.getSchema());
        LinkedHashMap<String, String> resultMap = new LinkedHashMap<String, String>();
        for (Schema.Field field : payloadSchema.getFields()) {
            Schema schema = this.sanitizedSchema(field.schema());
            Map<String, String> connectParameters = DebeziumSchemaUtils.getAvroConnectParameters(schema);
            String fieldName = Optional.of(schema).filter(s -> s.getType() == Schema.Type.RECORD).map(s -> field.name()).orElseGet(() -> connectParameters.getOrDefault("__debezium.source.column.name", field.name()));
            String rawValue = Objects.toString(record.get(fieldName), null);
            String className = schema.getProp("connect.name");
            String transformed = DebeziumSchemaUtils.transformAvroRawValue(rawValue, schema.getFullName(), className, this.typeMapping, record.get(fieldName), ZoneOffset.UTC);
            resultMap.put(fieldName, transformed);
            rowTypeBuilder.field(fieldName, DebeziumSchemaUtils.avroToPaimonDataType(schema));
        }
        this.evalComputedColumns(resultMap, rowTypeBuilder);
        return resultMap;
    }

    @Override
    protected String format() {
        return "debezium-avro";
    }

    private Schema sanitizedSchema(Schema schema) {
        if (schema.getType() == Schema.Type.UNION && schema.getTypes().size() == 2 && schema.getTypes().contains(NULL_AVRO_SCHEMA)) {
            for (Schema memberSchema : schema.getTypes()) {
                if (memberSchema.equals((Object)NULL_AVRO_SCHEMA)) continue;
                return memberSchema;
            }
        }
        return schema;
    }

    @Override
    @Nullable
    protected String getTableName() {
        return this.getFromSourceField("table");
    }

    @Override
    @Nullable
    protected String getDatabaseName() {
        return this.getFromSourceField("db");
    }

    @Nullable
    private String getFromSourceField(String key) {
        GenericRecord source = (GenericRecord)this.valueRecord.get("source");
        if (Objects.isNull(source)) {
            return null;
        }
        return source.get(key).toString();
    }

    protected Object getAndCheck(String key) {
        Object node = this.valueRecord.get(key);
        Preconditions.checkNotNull(node, key);
        return node;
    }
}

