/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.format.aliyun;

import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.aliyun.AliyunRecordParser;
import org.apache.paimon.flink.action.cdc.kafka.KafkaActionITCaseBase;
import org.apache.paimon.flink.action.cdc.watermark.MessageQueueCdcTimestampExtractor;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.paimon.types.RowKind;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AliyunJsonRecordParserTest
extends KafkaActionITCaseBase {
    private static final Logger log = LoggerFactory.getLogger(AliyunJsonRecordParserTest.class);
    private static List<String> insertList = new ArrayList<String>();
    private static List<String> updateList = new ArrayList<String>();
    private static List<String> deleteList = new ArrayList<String>();
    private static ObjectMapper objMapper = new ObjectMapper();

    @Override
    @Before
    public void setup() {
        String insertRes = "kafka/aliyun/table/event/event-insert.txt";
        String updateRes = "kafka/aliyun/table/event/event-update-in-one.txt";
        String deleteRes = "kafka/aliyun/table/event/event-delete.txt";
        try {
            URL url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(insertRes);
            Files.readAllLines(Paths.get(url.toURI())).stream().filter(this::isRecordLine).forEach(e -> insertList.add((String)e));
            url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(updateRes);
            Files.readAllLines(Paths.get(url.toURI())).stream().filter(this::isRecordLine).forEach(e -> updateList.add((String)e));
            url = AliyunJsonRecordParserTest.class.getClassLoader().getResource(deleteRes);
            Files.readAllLines(Paths.get(url.toURI())).stream().filter(this::isRecordLine).forEach(e -> deleteList.add((String)e));
        }
        catch (Exception e2) {
            log.error("Fail to init aliyun-json cases", (Throwable)e2);
        }
    }

    @Test
    public void extractInsertRecord() throws Exception {
        AliyunRecordParser parser = new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
        for (String json : insertList) {
            JsonNode rootNode = (JsonNode)objMapper.readValue(json, JsonNode.class);
            CdcSourceRecord cdcRecord = new CdcSourceRecord((Object)rootNode);
            Schema schema = parser.buildSchema(cdcRecord);
            Assert.assertEquals((Object)schema.primaryKeys(), Arrays.asList("id"));
            List records = parser.extractRecords();
            Assert.assertEquals((long)records.size(), (long)1L);
            CdcRecord result = ((RichCdcMultiplexRecord)records.get(0)).toRichCdcRecord().toCdcRecord();
            Assert.assertEquals((Object)result.kind(), (Object)RowKind.INSERT);
            String dbName = parser.getDatabaseName();
            Assert.assertEquals((Object)dbName, (Object)"bigdata_test");
            String tableName = parser.getTableName();
            Assert.assertEquals((Object)tableName, (Object)"sync_test_table");
            MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
            Assert.assertTrue((extractor.extractTimestamp(cdcRecord) > 0L ? 1 : 0) != 0);
        }
    }

    @Test
    public void extractUpdateRecord() throws Exception {
        AliyunRecordParser parser = new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
        for (String json : updateList) {
            JsonNode jsonNode = (JsonNode)objMapper.readValue(json, JsonNode.class);
            CdcSourceRecord cdcRecord = new CdcSourceRecord((Object)jsonNode);
            Schema schema = parser.buildSchema(cdcRecord);
            Assert.assertEquals((Object)schema.primaryKeys(), Arrays.asList("id"));
            List records = parser.extractRecords();
            Assert.assertEquals((long)records.size(), (long)1L);
            CdcRecord result = ((RichCdcMultiplexRecord)records.get(0)).toRichCdcRecord().toCdcRecord();
            Assert.assertEquals((Object)result.kind(), (Object)RowKind.UPDATE_AFTER);
            String dbName = parser.getDatabaseName();
            Assert.assertEquals((Object)dbName, (Object)"bigdata_test");
            String tableName = parser.getTableName();
            Assert.assertEquals((Object)tableName, (Object)"sync_test_table");
            MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
            Assert.assertTrue((extractor.extractTimestamp(cdcRecord) > 0L ? 1 : 0) != 0);
        }
    }

    @Test
    public void extractDeleteRecord() throws Exception {
        AliyunRecordParser parser = new AliyunRecordParser(TypeMapping.defaultMapping(), Collections.emptyList());
        for (String json : deleteList) {
            JsonNode jsonNode = (JsonNode)objMapper.readValue(json, JsonNode.class);
            CdcSourceRecord cdcRecord = new CdcSourceRecord((Object)jsonNode);
            Schema schema = parser.buildSchema(cdcRecord);
            Assert.assertEquals((Object)schema.primaryKeys(), Arrays.asList("id"));
            List records = parser.extractRecords();
            Assert.assertEquals((long)records.size(), (long)1L);
            CdcRecord result = ((RichCdcMultiplexRecord)records.get(0)).toRichCdcRecord().toCdcRecord();
            Assert.assertEquals((Object)result.kind(), (Object)RowKind.DELETE);
            String dbName = parser.getDatabaseName();
            Assert.assertEquals((Object)dbName, (Object)"bigdata_test");
            String tableName = parser.getTableName();
            Assert.assertEquals((Object)tableName, (Object)"sync_test_table");
            MessageQueueCdcTimestampExtractor extractor = new MessageQueueCdcTimestampExtractor();
            Assert.assertTrue((extractor.extractTimestamp(cdcRecord) > 0L ? 1 : 0) != 0);
        }
    }
}

