/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sink;

import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.types.RowKind;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.PaimonDataStreamSinkProvider;
import org.apache.paimon.flink.log.LogSinkProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.sink.LogFlinkSinkBuilder;
import org.apache.paimon.flink.sink.LogSinkFunction;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.Table;

public abstract class FlinkTableSinkBase
implements DynamicTableSink,
SupportsOverwrite,
SupportsPartitioning {
    protected final ObjectIdentifier tableIdentifier;
    protected final DynamicTableFactory.Context context;
    @Nullable
    protected final LogStoreTableFactory logStoreTableFactory;
    protected final Table table;
    protected Map<String, String> staticPartitions = new HashMap<String, String>();
    protected boolean overwrite = false;

    public FlinkTableSinkBase(ObjectIdentifier tableIdentifier, Table table, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory) {
        this.tableIdentifier = tableIdentifier;
        this.table = table;
        this.context = context;
        this.logStoreTableFactory = logStoreTableFactory;
    }

    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        if (this.table.primaryKeys().isEmpty()) {
            return requestedMode;
        }
        Options options = Options.fromMap(this.table.options());
        if (options.get(CoreOptions.CHANGELOG_PRODUCER) == CoreOptions.ChangelogProducer.INPUT) {
            return requestedMode;
        }
        if (options.get(CoreOptions.MERGE_ENGINE) == CoreOptions.MergeEngine.AGGREGATE) {
            return requestedMode;
        }
        if (options.get(CoreOptions.MERGE_ENGINE) == CoreOptions.MergeEngine.PARTIAL_UPDATE && new CoreOptions(options).definedAggFunc()) {
            return requestedMode;
        }
        if (options.get(CoreOptions.LOG_CHANGELOG_MODE) == CoreOptions.LogChangelogMode.ALL) {
            return requestedMode;
        }
        ChangelogMode.Builder builder = ChangelogMode.newBuilder();
        for (RowKind kind : requestedMode.getContainedKinds()) {
            if (kind == RowKind.UPDATE_BEFORE) continue;
            builder.addContainedKind(kind);
        }
        return builder.build();
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
        if (this.overwrite && !context.isBounded()) {
            throw new UnsupportedOperationException("Paimon doesn't support streaming INSERT OVERWRITE.");
        }
        LogSinkProvider logSinkProvider = null;
        if (this.logStoreTableFactory != null) {
            logSinkProvider = this.logStoreTableFactory.createSinkProvider(this.context, context);
        }
        Options conf = Options.fromMap(this.table.options());
        LogSinkFunction logSinkFunction = this.overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
        return new PaimonDataStreamSinkProvider(dataStream -> {
            LogFlinkSinkBuilder builder = new LogFlinkSinkBuilder(this.table);
            builder.logSinkFunction(logSinkFunction).forRowData((DataStream<RowData>)new DataStream(dataStream.getExecutionEnvironment(), dataStream.getTransformation())).inputBounded(context.isBounded());
            if (this.overwrite) {
                builder.overwrite(this.staticPartitions);
            }
            conf.getOptional(FlinkConnectorOptions.SINK_PARALLELISM).ifPresent(builder::parallelism);
            return builder.build();
        });
    }

    public DynamicTableSink copy() {
        FlinkTableSink copied = new FlinkTableSink(this.tableIdentifier, this.table, this.context, this.logStoreTableFactory);
        copied.staticPartitions = new HashMap<String, String>(this.staticPartitions);
        copied.overwrite = this.overwrite;
        return copied;
    }

    public String asSummaryString() {
        return "PaimonSink";
    }

    public void applyStaticPartition(Map<String, String> partition) {
        this.table.partitionKeys().forEach(partitionKey -> {
            if (partition.containsKey(partitionKey)) {
                this.staticPartitions.put((String)partitionKey, (String)partition.get(partitionKey));
            }
        });
    }

    public void applyOverwrite(boolean overwrite) {
        this.overwrite = overwrite;
    }
}

