/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.Properties;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
import org.apache.flink.connector.kafka.lineage.LineageUtil;
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
import org.apache.flink.connector.kafka.sink.KafkaCommittable;
import org.apache.flink.connector.kafka.sink.KafkaCommittableSerializer;
import org.apache.flink.connector.kafka.sink.KafkaCommitter;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
import org.apache.flink.connector.kafka.sink.KafkaWriter;
import org.apache.flink.connector.kafka.sink.KafkaWriterState;
import org.apache.flink.connector.kafka.sink.KafkaWriterStateSerializer;
import org.apache.flink.connector.kafka.sink.TwoPhaseCommittingStatefulSink;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.LineageVertexProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public class KafkaSink<IN>
implements LineageVertexProvider,
TwoPhaseCommittingStatefulSink<IN, KafkaWriterState, KafkaCommittable> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
    private final DeliveryGuarantee deliveryGuarantee;
    private final KafkaRecordSerializationSchema<IN> recordSerializer;
    private final Properties kafkaProducerConfig;
    private final String transactionalIdPrefix;

    KafkaSink(DeliveryGuarantee deliveryGuarantee, Properties kafkaProducerConfig, String transactionalIdPrefix, KafkaRecordSerializationSchema<IN> recordSerializer) {
        this.deliveryGuarantee = deliveryGuarantee;
        this.kafkaProducerConfig = kafkaProducerConfig;
        this.transactionalIdPrefix = transactionalIdPrefix;
        this.recordSerializer = recordSerializer;
    }

    public static <IN> KafkaSinkBuilder<IN> builder() {
        return new KafkaSinkBuilder();
    }

    @Internal
    public Committer<KafkaCommittable> createCommitter() throws IOException {
        return new KafkaCommitter(this.kafkaProducerConfig);
    }

    @Internal
    public SimpleVersionedSerializer<KafkaCommittable> getCommittableSerializer() {
        return new KafkaCommittableSerializer();
    }

    @Internal
    public KafkaWriter<IN> createWriter(Sink.InitContext context) throws IOException {
        return new KafkaWriter<IN>(this.deliveryGuarantee, this.kafkaProducerConfig, this.transactionalIdPrefix, context, this.recordSerializer, context.asSerializationSchemaInitializationContext(), Collections.emptyList());
    }

    @Internal
    public KafkaWriter<IN> restoreWriter(Sink.InitContext context, Collection<KafkaWriterState> recoveredState) throws IOException {
        return new KafkaWriter<IN>(this.deliveryGuarantee, this.kafkaProducerConfig, this.transactionalIdPrefix, context, this.recordSerializer, context.asSerializationSchemaInitializationContext(), recoveredState);
    }

    @Internal
    public SimpleVersionedSerializer<KafkaWriterState> getWriterStateSerializer() {
        return new KafkaWriterStateSerializer();
    }

    @VisibleForTesting
    protected Properties getKafkaProducerConfig() {
        return this.kafkaProducerConfig;
    }

    public LineageVertex getLineageVertex() {
        Optional<KafkaDatasetFacet> kafkaDatasetFacet;
        if (this.recordSerializer instanceof KafkaDatasetFacetProvider) {
            kafkaDatasetFacet = ((KafkaDatasetFacetProvider)((Object)this.recordSerializer)).getKafkaDatasetFacet();
            if (!kafkaDatasetFacet.isPresent()) {
                LOG.info("Provider did not return kafka dataset facet");
                return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
            }
        } else {
            LOG.info("recordSerializer does not implement KafkaDatasetFacetProvider: {}", this.recordSerializer);
            return LineageUtil.sourceLineageVertexOf(Collections.emptyList());
        }
        kafkaDatasetFacet.get().setProperties(this.kafkaProducerConfig);
        String namespace = LineageUtil.namespaceOf(this.kafkaProducerConfig);
        Optional<Object> typeDatasetFacet = Optional.empty();
        if (this.recordSerializer instanceof TypeDatasetFacetProvider) {
            typeDatasetFacet = ((TypeDatasetFacetProvider)((Object)this.recordSerializer)).getTypeDatasetFacet();
        }
        if (typeDatasetFacet.isPresent()) {
            return LineageUtil.sourceLineageVertexOf(Collections.singleton(LineageUtil.datasetOf(namespace, kafkaDatasetFacet.get(), (TypeDatasetFacet)typeDatasetFacet.get())));
        }
        return LineageUtil.sourceLineageVertexOf(Collections.singleton(LineageUtil.datasetOf(namespace, kafkaDatasetFacet.get())));
    }
}

