/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.paimon.sink.v2;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.cdc.connectors.paimon.sink.v2.MultiTableCommittableChannelComputer;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonCommitter;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonRecordSerializer;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PaimonWriter;
import org.apache.flink.cdc.connectors.paimon.sink.v2.PreCommitOperator;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
import org.apache.flink.streaming.api.connector.sink2.WithPreCommitTopology;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.paimon.flink.sink.FlinkStreamPartitioner;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableSerializer;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.sink.CommitMessageSerializer;

public class PaimonSink<InputT>
implements WithPreCommitTopology<InputT, MultiTableCommittable> {
    public static final String DEFAULT_COMMIT_USER = "admin";
    protected final Options catalogOptions;
    protected final String commitUser;
    private final PaimonRecordSerializer<InputT> serializer;

    public PaimonSink(Options catalogOptions, PaimonRecordSerializer<InputT> serializer) {
        this.catalogOptions = catalogOptions;
        this.serializer = serializer;
        this.commitUser = DEFAULT_COMMIT_USER;
    }

    public PaimonSink(Options catalogOptions, String commitUser, PaimonRecordSerializer<InputT> serializer) {
        this.catalogOptions = catalogOptions;
        this.commitUser = commitUser;
        this.serializer = serializer;
    }

    public PaimonWriter<InputT> createWriter(Sink.InitContext context) {
        long lastCheckpointId = context.getRestoredCheckpointId().orElse(0L);
        return new PaimonWriter<InputT>(this.catalogOptions, (MetricGroup)context.metricGroup(), this.commitUser, this.serializer, lastCheckpointId);
    }

    public Committer<MultiTableCommittable> createCommitter() {
        return new PaimonCommitter(this.catalogOptions, this.commitUser);
    }

    public SimpleVersionedSerializer<MultiTableCommittable> getCommittableSerializer() {
        CommitMessageSerializer fileSerializer = new CommitMessageSerializer();
        return new MultiTableCommittableSerializer(fileSerializer);
    }

    public DataStream<CommittableMessage<MultiTableCommittable>> addPreCommitTopology(DataStream<CommittableMessage<MultiTableCommittable>> committables) {
        TypeInformation typeInformation = CommittableMessageTypeInfo.of(this::getCommittableSerializer);
        DataStream<CommittableMessage<MultiTableCommittable>> partitioned = FlinkStreamPartitioner.partition(committables, new MultiTableCommittableChannelComputer(), committables.getParallelism());
        return partitioned.transform("preCommit", typeInformation, (OneInputStreamOperator)new PreCommitOperator(this.catalogOptions, this.commitUser)).setParallelism(committables.getParallelism());
    }
}

