/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.paimon.sink.v2;

import java.io.Serializable;
import java.time.ZoneId;
import org.apache.flink.api.common.functions.Partitioner;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordSerializer;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonSink;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketAssignOperator;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapper;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.BucketWrapperEventTypeInfo;
import org.apache.flink.cdc.connectors.paimon.sink.v2.bucket.FlushEventAlignmentOperator;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.connector.sink2.WithPreWriteTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableSerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.sink.CommitMessageSerializer;

public class PaimonEventSink
extends PaimonSink<Event>
implements WithPreWriteTopology<Event> {
    public final String schemaOperatorUid;
    public final ZoneId zoneId;

    public PaimonEventSink(Options catalogOptions, String commitUser, PaimonRecordSerializer<Event> serializer, String schemaOperatorUid, ZoneId zoneId) {
        super(catalogOptions, commitUser, serializer);
        this.schemaOperatorUid = schemaOperatorUid;
        this.zoneId = zoneId;
    }

    public DataStream<Event> addPreWriteTopology(DataStream<Event> dataStream) {
        return dataStream.transform("BucketAssign", (TypeInformation)new BucketWrapperEventTypeInfo(), (OneInputStreamOperator)new BucketAssignOperator(this.catalogOptions, this.schemaOperatorUid, this.zoneId, this.commitUser)).name("Assign Bucket").partitionCustom((Partitioner & Serializable)(bucket, numPartitions) -> bucket % numPartitions, (KeySelector & Serializable)event -> ((BucketWrapper)event).getBucket()).transform("FlushEventAlignment", (TypeInformation)new BucketWrapperEventTypeInfo(), (OneInputStreamOperator)new FlushEventAlignmentOperator());
    }

    @Override
    public SimpleVersionedSerializer<MultiTableCommittable> getCommittableSerializer() {
        CommitMessageSerializer fileSerializer = new CommitMessageSerializer();
        return new MultiTableCommittableSerializer(fileSerializer);
    }
}

