/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;

public class CdcRecordSerializeITCase {
    @Test
    public void testCdcRecordKryoSerialize() throws Exception {
        KryoSerializer<RichCdcMultiplexRecord> kr = CdcRecordSerializeITCase.createFlinkKryoSerializer(RichCdcMultiplexRecord.class);
        RowType.Builder rowType = RowType.builder();
        rowType.field("id", (DataType)new BigIntType());
        rowType.field("name", (DataType)new VarCharType());
        rowType.field("pt", (DataType)new VarCharType());
        List fields = rowType.build().getFields();
        List<String> primaryKeys = Collections.singletonList("id");
        HashMap<String, String> recordData = new HashMap<String, String>();
        recordData.put("id", "1");
        recordData.put("name", "HunterXHunter");
        recordData.put("pt", "2024-06-28");
        CdcRecord cdcRecord = new CdcRecord(RowKind.INSERT, recordData);
        RichCdcMultiplexRecord serializeRecord = new RichCdcMultiplexRecord("default", "T", fields, primaryKeys, cdcRecord);
        TestOutputView outputView = new TestOutputView();
        kr.serialize((Object)serializeRecord, (DataOutputView)outputView);
        RichCdcMultiplexRecord deserializeRecord = (RichCdcMultiplexRecord)kr.deserialize((DataInputView)outputView.getInputView());
        Assertions.assertThat((Object)deserializeRecord.toRichCdcRecord().toCdcRecord()).isEqualTo((Object)cdcRecord);
        Assertions.assertThat((String)deserializeRecord.databaseName()).isEqualTo("default");
        Assertions.assertThat((String)deserializeRecord.tableName()).isEqualTo("T");
        Assertions.assertThat((List)deserializeRecord.primaryKeys()).isEqualTo(primaryKeys);
        Assertions.assertThat((List)deserializeRecord.fields()).isEqualTo((Object)fields);
    }

    @Test
    public void testUnmodifiableListKryoSerialize() throws Exception {
        KryoSerializer<List> kryoSerializer = CdcRecordSerializeITCase.createFlinkKryoSerializer(List.class);
        RowType.Builder rowType = RowType.builder();
        rowType.field("id", (DataType)new BigIntType());
        rowType.field("name", (DataType)new VarCharType());
        rowType.field("pt", (DataType)new VarCharType());
        ArrayList fields = rowType.build().getFields();
        TestOutputView outputView = new TestOutputView();
        kryoSerializer.serialize((Object)fields, (DataOutputView)outputView);
        Assertions.assertThatThrownBy(() -> {
            List cfr_ignored_0 = (List)kryoSerializer.deserialize((DataInputView)outputView.getInputView());
        }).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(UnsupportedOperationException.class)});
        TestOutputView outputView2 = new TestOutputView();
        fields = new ArrayList(fields);
        kryoSerializer.serialize(fields, (DataOutputView)outputView2);
        List deserializeRecord = (List)kryoSerializer.deserialize((DataInputView)outputView2.getInputView());
        Assertions.assertThat((List)deserializeRecord).isEqualTo(fields);
    }

    public static <T> KryoSerializer<T> createFlinkKryoSerializer(Class<T> type) throws NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
        try {
            Constructor constructor = KryoSerializer.class.getConstructor(Class.class, SerializerConfig.class);
            return (KryoSerializer)constructor.newInstance(type, new SerializerConfigImpl());
        }
        catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException constructor) {
            Constructor constructor2 = KryoSerializer.class.getConstructor(Class.class, ExecutionConfig.class);
            return (KryoSerializer)constructor2.newInstance(type, new ExecutionConfig());
        }
    }

    private static final class TestInputView
    extends DataInputStream
    implements DataInputView {
        public TestInputView(byte[] data) {
            super(new ByteArrayInputStream(data));
        }

        public void skipBytesToRead(int numBytes) throws IOException {
            while (numBytes > 0) {
                int skipped = this.skipBytes(numBytes);
                numBytes -= skipped;
            }
        }
    }

    private static final class TestOutputView
    extends DataOutputStream
    implements DataOutputView {
        public TestOutputView() {
            super(new ByteArrayOutputStream(4096));
        }

        public TestInputView getInputView() {
            ByteArrayOutputStream baos = (ByteArrayOutputStream)this.out;
            return new TestInputView(baos.toByteArray());
        }

        public void skipBytesToWrite(int numBytes) throws IOException {
            for (int i = 0; i < numBytes; ++i) {
                this.write(0);
            }
        }

        public void write(DataInputView source, int numBytes) throws IOException {
            byte[] buffer = new byte[numBytes];
            source.readFully(buffer);
            this.write(buffer);
        }
    }
}

