/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink.cdc;

import java.util.List;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.apache.paimon.flink.sink.cdc.CdcRecord;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.types.DataField;

public class CdcParsingProcessFunction<T>
extends ProcessFunction<T, CdcRecord> {
    public static final OutputTag<List<DataField>> NEW_DATA_FIELD_LIST_OUTPUT_TAG = new OutputTag("new-data-field-list", (TypeInformation)new ListTypeInfo(DataField.class));
    private final EventParser.Factory<T> parserFactory;
    private transient EventParser<T> parser;

    public CdcParsingProcessFunction(EventParser.Factory<T> parserFactory) {
        this.parserFactory = parserFactory;
    }

    public void open(Configuration parameters) throws Exception {
        this.parser = this.parserFactory.create();
    }

    public void processElement(T raw, ProcessFunction.Context context, Collector<CdcRecord> collector) throws Exception {
        this.parser.setRawEvent(raw);
        List<DataField> schemaChange = this.parser.parseSchemaChange();
        if (!schemaChange.isEmpty()) {
            context.output(NEW_DATA_FIELD_LIST_OUTPUT_TAG, schemaChange);
        }
        this.parser.parseRecords().forEach(arg_0 -> collector.collect(arg_0));
    }
}

