/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.SyncJobHandler;
import org.apache.paimon.flink.action.cdc.SynchronizationActionBase;
import org.apache.paimon.flink.action.cdc.TableNameConverter;
import org.apache.paimon.flink.sink.cdc.EventParser;
import org.apache.paimon.flink.sink.cdc.FlinkCdcSyncDatabaseSinkBuilder;
import org.apache.paimon.flink.sink.cdc.NewTableSchemaBuilder;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord;
import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecordEventParser;
import org.apache.paimon.table.FileStoreTable;

public abstract class SyncDatabaseActionBase
extends SynchronizationActionBase {
    protected boolean mergeShards = true;
    protected MultiTablesSinkMode mode = MultiTablesSinkMode.COMBINED;
    protected String tablePrefix = "";
    protected String tableSuffix = "";
    protected String includingTables = ".*";
    protected List<String> partitionKeys = new ArrayList<String>();
    protected List<String> primaryKeys = new ArrayList<String>();
    @Nullable
    protected String excludingTables;
    protected List<FileStoreTable> tables = new ArrayList<FileStoreTable>();
    protected Map<String, List<String>> partitionKeyMultiple = new HashMap<String, List<String>>();

    public SyncDatabaseActionBase(String warehouse, String database, Map<String, String> catalogConfig, Map<String, String> cdcSourceConfig, SyncJobHandler.SourceType sourceType) {
        super(warehouse, database, catalogConfig, cdcSourceConfig, new SyncJobHandler(sourceType, cdcSourceConfig, database));
    }

    public SyncDatabaseActionBase mergeShards(boolean mergeShards) {
        this.mergeShards = mergeShards;
        return this;
    }

    public SyncDatabaseActionBase withMode(MultiTablesSinkMode mode) {
        this.mode = mode;
        return this;
    }

    public SyncDatabaseActionBase withTablePrefix(@Nullable String tablePrefix) {
        if (tablePrefix != null) {
            this.tablePrefix = tablePrefix;
        }
        return this;
    }

    public SyncDatabaseActionBase withTableSuffix(@Nullable String tableSuffix) {
        if (tableSuffix != null) {
            this.tableSuffix = tableSuffix;
        }
        return this;
    }

    public SyncDatabaseActionBase includingTables(@Nullable String includingTables) {
        if (includingTables != null) {
            this.includingTables = includingTables;
        }
        return this;
    }

    public SyncDatabaseActionBase excludingTables(@Nullable String excludingTables) {
        this.excludingTables = excludingTables;
        return this;
    }

    public SyncDatabaseActionBase withPartitionKeys(String ... partitionKeys) {
        this.partitionKeys.addAll(Arrays.asList(partitionKeys));
        return this;
    }

    public SyncDatabaseActionBase withPrimaryKeys(String ... primaryKeys) {
        this.primaryKeys.addAll(Arrays.asList(primaryKeys));
        return this;
    }

    @Override
    protected void validateCaseSensitivity() {
        Catalog.validateCaseInsensitive(this.allowUpperCase, "Database", this.database);
        Catalog.validateCaseInsensitive(this.allowUpperCase, "Table prefix", this.tablePrefix);
        Catalog.validateCaseInsensitive(this.allowUpperCase, "Table suffix", this.tableSuffix);
    }

    @Override
    protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
        return this.syncJobHandler.provideRecordParser(Collections.emptyList(), this.typeMapping, this.metadataConverters);
    }

    public SyncDatabaseActionBase withPartitionKeyMultiple(Map<String, List<String>> partitionKeyMultiple) {
        if (partitionKeyMultiple != null) {
            this.partitionKeyMultiple = partitionKeyMultiple;
        }
        return this;
    }

    @Override
    protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory() {
        HashSet<String> createdTables;
        NewTableSchemaBuilder schemaBuilder = new NewTableSchemaBuilder(this.tableConfig, this.allowUpperCase, this.partitionKeys, this.primaryKeys, this.partitionKeyMultiple, this.metadataConverters);
        Pattern includingPattern = Pattern.compile(this.includingTables);
        Pattern excludingPattern = this.excludingTables == null ? null : Pattern.compile(this.excludingTables);
        TableNameConverter tableNameConverter = new TableNameConverter(this.allowUpperCase, this.mergeShards, this.tablePrefix, this.tableSuffix);
        try {
            createdTables = new HashSet<String>(this.catalog.listTables(this.database));
        }
        catch (Catalog.DatabaseNotExistException e) {
            throw new RuntimeException(e);
        }
        return () -> new RichCdcMultiplexRecordEventParser(schemaBuilder, includingPattern, excludingPattern, tableNameConverter, createdTables);
    }

    @Override
    protected void buildSink(DataStream<RichCdcMultiplexRecord> input, EventParser.Factory<RichCdcMultiplexRecord> parserFactory) {
        new FlinkCdcSyncDatabaseSinkBuilder<RichCdcMultiplexRecord>().withInput(input).withParserFactory(parserFactory).withCatalogLoader(this.catalogLoader()).withDatabase(this.database).withTables(this.tables).withMode(this.mode).withTableOptions(this.tableConfig).build();
    }
}

