/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.mongodb.table;

import com.mongodb.client.model.changestream.OperationType;
import java.time.ZoneId;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cdc.common.annotation.PublicEvolving;
import org.apache.flink.cdc.connectors.mongodb.table.MongoDBConnectorDeserializationSchema;
import org.apache.flink.cdc.debezium.table.MetadataConverter;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.bson.BsonDocument;

@PublicEvolving
public class MongoDBConnectorFullChangelogDeserializationSchema
extends MongoDBConnectorDeserializationSchema {
    private static final long serialVersionUID = 1750787080613035184L;

    public MongoDBConnectorFullChangelogDeserializationSchema(RowType physicalDataType, MetadataConverter[] metadataConverters, TypeInformation<RowData> resultTypeInfo, ZoneId localTimeZone) {
        super(physicalDataType, metadataConverters, resultTypeInfo, localTimeZone);
    }

    @Override
    public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
        Struct value = (Struct)record.value();
        Schema valueSchema = record.valueSchema();
        OperationType op = this.operationTypeFor(record);
        BsonDocument documentKey = this.extractBsonDocument(value, valueSchema, "documentKey");
        BsonDocument fullDocument = this.extractBsonDocument(value, valueSchema, "fullDocument");
        BsonDocument fullDocumentBeforeChange = this.extractBsonDocument(value, valueSchema, "fullDocumentBeforeChange");
        switch (op) {
            case INSERT: {
                GenericRowData insert = this.extractRowData(fullDocument);
                insert.setRowKind(RowKind.INSERT);
                this.emit(record, (RowData)insert, out);
                break;
            }
            case DELETE: {
                if (fullDocumentBeforeChange != null) {
                    GenericRowData updateBefore = this.extractRowData(fullDocumentBeforeChange);
                    updateBefore.setRowKind(RowKind.DELETE);
                    this.emit(record, (RowData)updateBefore, out);
                    break;
                }
                GenericRowData delete = this.extractRowData(documentKey);
                delete.setRowKind(RowKind.DELETE);
                this.emit(record, (RowData)delete, out);
                break;
            }
            case UPDATE: {
                if (fullDocument == null) break;
                if (fullDocumentBeforeChange != null) {
                    GenericRowData updateBefore = this.extractRowData(fullDocumentBeforeChange);
                    updateBefore.setRowKind(RowKind.UPDATE_BEFORE);
                    this.emit(record, (RowData)updateBefore, out);
                }
                GenericRowData updateAfter = this.extractRowData(fullDocument);
                updateAfter.setRowKind(RowKind.UPDATE_AFTER);
                this.emit(record, (RowData)updateAfter, out);
                break;
            }
            case REPLACE: {
                if (fullDocumentBeforeChange != null) {
                    GenericRowData updateBefore = this.extractRowData(fullDocumentBeforeChange);
                    updateBefore.setRowKind(RowKind.UPDATE_BEFORE);
                    this.emit(record, (RowData)updateBefore, out);
                }
                GenericRowData replaceAfter = this.extractRowData(fullDocument);
                replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
                this.emit(record, (RowData)replaceAfter, out);
                break;
            }
        }
    }
}

