/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.paimon.sink;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEventType;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
import org.apache.flink.cdc.common.event.visitor.SchemaChangeEventVisitor;
import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
import org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.sink.MetadataApplier;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.utils.DataTypeUtils;
import org.apache.flink.cdc.common.utils.Preconditions;
import org.apache.flink.cdc.connectors.paimon.sink.SchemaChangeProvider;
import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PaimonMetadataApplier
implements MetadataApplier {
    private static final Logger LOG = LoggerFactory.getLogger(PaimonMetadataApplier.class);
    private transient Catalog catalog;
    private final Map<String, String> tableOptions;
    private final Options catalogOptions;
    private final Map<TableId, List<String>> partitionMaps;
    private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;

    public PaimonMetadataApplier(Options catalogOptions) {
        this.catalogOptions = catalogOptions;
        this.tableOptions = new HashMap<String, String>();
        this.partitionMaps = new HashMap<TableId, List<String>>();
        this.enabledSchemaEvolutionTypes = this.getSupportedSchemaEvolutionTypes();
    }

    public PaimonMetadataApplier(Options catalogOptions, Map<String, String> tableOptions, Map<TableId, List<String>> partitionMaps) {
        this.catalogOptions = catalogOptions;
        this.tableOptions = tableOptions;
        this.partitionMaps = partitionMaps;
        this.enabledSchemaEvolutionTypes = this.getSupportedSchemaEvolutionTypes();
    }

    public MetadataApplier setAcceptedSchemaEvolutionTypes(Set<SchemaChangeEventType> schemaEvolutionTypes) {
        this.enabledSchemaEvolutionTypes = schemaEvolutionTypes;
        return this;
    }

    public boolean acceptsSchemaEvolutionType(SchemaChangeEventType schemaChangeEventType) {
        return this.enabledSchemaEvolutionTypes.contains(schemaChangeEventType);
    }

    public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
        return Sets.newHashSet((Object[])new SchemaChangeEventType[]{SchemaChangeEventType.CREATE_TABLE, SchemaChangeEventType.ADD_COLUMN, SchemaChangeEventType.DROP_COLUMN, SchemaChangeEventType.RENAME_COLUMN, SchemaChangeEventType.ALTER_COLUMN_TYPE});
    }

    public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) throws SchemaEvolveException {
        if (this.catalog == null) {
            this.catalog = FlinkCatalogFactory.createPaimonCatalog(this.catalogOptions);
        }
        SchemaChangeEventVisitor.visit((SchemaChangeEvent)schemaChangeEvent, addColumnEvent -> {
            this.applyAddColumn(addColumnEvent);
            return null;
        }, alterColumnTypeEvent -> {
            this.applyAlterColumnType(alterColumnTypeEvent);
            return null;
        }, createTableEvent -> {
            this.applyCreateTable(createTableEvent);
            return null;
        }, dropColumnEvent -> {
            this.applyDropColumn(dropColumnEvent);
            return null;
        }, dropTableEvent -> {
            this.applyDropTable(dropTableEvent);
            return null;
        }, renameColumnEvent -> {
            this.applyRenameColumn(renameColumnEvent);
            return null;
        }, truncateTableEvent -> {
            this.applyTruncateTable(truncateTableEvent);
            return null;
        });
    }

    public void close() throws Exception {
        if (this.catalog != null) {
            this.catalog.close();
        }
    }

    private void applyCreateTable(CreateTableEvent event) throws SchemaEvolveException {
        try {
            if (!this.catalog.databaseExists(event.tableId().getSchemaName())) {
                this.catalog.createDatabase(event.tableId().getSchemaName(), true);
            }
            Schema schema = event.getSchema();
            Schema.Builder builder = new Schema.Builder();
            schema.getColumns().forEach(column -> builder.column(column.getName(), LogicalTypeConversion.toDataType(DataTypeUtils.toFlinkDataType((DataType)column.getType()).getLogicalType())));
            ArrayList<String> partitionKeys = new ArrayList<String>();
            List primaryKeys = schema.primaryKeys();
            if (this.partitionMaps.containsKey(event.tableId())) {
                partitionKeys.addAll((Collection)this.partitionMaps.get(event.tableId()));
            } else if (schema.partitionKeys() != null && !schema.partitionKeys().isEmpty()) {
                partitionKeys.addAll(schema.partitionKeys());
            }
            for (String partitionColumn : partitionKeys) {
                if (primaryKeys.contains(partitionColumn)) continue;
                primaryKeys.add(partitionColumn);
            }
            builder.partitionKeys(partitionKeys).primaryKey(primaryKeys).comment(schema.comment()).options(this.tableOptions).options(schema.options());
            this.catalog.createTable(PaimonMetadataApplier.tableIdToIdentifier((SchemaChangeEvent)event), builder.build(), true);
        }
        catch (Catalog.DatabaseAlreadyExistException | Catalog.DatabaseNotExistException | Catalog.TableAlreadyExistException e) {
            throw new SchemaEvolveException((SchemaChangeEvent)event, e.getMessage(), (Throwable)e);
        }
    }

    private void applyAddColumn(AddColumnEvent event) throws SchemaEvolveException {
        try {
            List<SchemaChange> tableChangeList = this.applyAddColumnEventWithPosition(event);
            this.catalog.alterTable(PaimonMetadataApplier.tableIdToIdentifier((SchemaChangeEvent)event), tableChangeList, true);
        }
        catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException | Catalog.TableNotExistException e) {
            if (e instanceof Catalog.ColumnAlreadyExistException) {
                LOG.warn("{}, skip it.", (Object)e.getMessage());
            }
            throw new SchemaEvolveException((SchemaChangeEvent)event, e.getMessage(), (Throwable)e);
        }
    }

    private List<SchemaChange> applyAddColumnEventWithPosition(AddColumnEvent event) throws SchemaEvolveException {
        try {
            ArrayList<SchemaChange> tableChangeList = new ArrayList<SchemaChange>();
            block8: for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
                switch (columnWithPosition.getPosition()) {
                    case FIRST: {
                        tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition, SchemaChange.Move.first(columnWithPosition.getAddColumn().getName())));
                        continue block8;
                    }
                    case LAST: {
                        tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition));
                        continue block8;
                    }
                    case BEFORE: {
                        tableChangeList.addAll(this.applyAddColumnWithBeforePosition(event.tableId().getSchemaName(), event.tableId().getTableName(), columnWithPosition));
                        continue block8;
                    }
                    case AFTER: {
                        Preconditions.checkNotNull((Object)columnWithPosition.getExistedColumnName(), (String)"Existing column name must be provided for AFTER position");
                        SchemaChange.Move after = SchemaChange.Move.after(columnWithPosition.getAddColumn().getName(), columnWithPosition.getExistedColumnName());
                        tableChangeList.addAll(SchemaChangeProvider.add(columnWithPosition, after));
                        continue block8;
                    }
                }
                throw new SchemaEvolveException((SchemaChangeEvent)event, "Unknown column position: " + columnWithPosition.getPosition());
            }
            return tableChangeList;
        }
        catch (Catalog.TableNotExistException e) {
            throw new SchemaEvolveException((SchemaChangeEvent)event, e.getMessage(), (Throwable)e);
        }
    }

    private List<SchemaChange> applyAddColumnWithBeforePosition(String schemaName, String tableName, AddColumnEvent.ColumnWithPosition columnWithPosition) throws Catalog.TableNotExistException {
        String existedColumnName = columnWithPosition.getExistedColumnName();
        Table table = this.catalog.getTable(new Identifier(schemaName, tableName));
        List<String> columnNames = table.rowType().getFieldNames();
        int index = this.checkColumnPosition(existedColumnName, columnNames);
        SchemaChange.Move after = SchemaChange.Move.after(columnWithPosition.getAddColumn().getName(), columnNames.get(index - 1));
        return SchemaChangeProvider.add(columnWithPosition, after);
    }

    private int checkColumnPosition(String existedColumnName, List<String> columnNames) {
        if (existedColumnName == null) {
            return 0;
        }
        int index = columnNames.indexOf(existedColumnName);
        Preconditions.checkArgument((index != -1 ? 1 : 0) != 0, (String)"Column %s not found", (Object[])new Object[]{existedColumnName});
        return index;
    }

    private void applyDropColumn(DropColumnEvent event) throws SchemaEvolveException {
        try {
            ArrayList<SchemaChange> tableChangeList = new ArrayList<SchemaChange>();
            event.getDroppedColumnNames().forEach(column -> tableChangeList.addAll(SchemaChangeProvider.drop(column)));
            this.catalog.alterTable(PaimonMetadataApplier.tableIdToIdentifier((SchemaChangeEvent)event), tableChangeList, true);
        }
        catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException | Catalog.TableNotExistException e) {
            throw new SchemaEvolveException((SchemaChangeEvent)event, e.getMessage(), (Throwable)e);
        }
    }

    private void applyRenameColumn(RenameColumnEvent event) throws SchemaEvolveException {
        try {
            Map<String, String> options = this.catalog.getTable(new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName())).options();
            ArrayList<SchemaChange> tableChangeList = new ArrayList<SchemaChange>();
            event.getNameMapping().forEach((oldName, newName) -> tableChangeList.addAll(SchemaChangeProvider.rename(oldName, newName, options)));
            this.catalog.alterTable(PaimonMetadataApplier.tableIdToIdentifier((SchemaChangeEvent)event), tableChangeList, true);
        }
        catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException | Catalog.TableNotExistException e) {
            throw new SchemaEvolveException((SchemaChangeEvent)event, e.getMessage(), (Throwable)e);
        }
    }

    private void applyAlterColumnType(AlterColumnTypeEvent event) throws SchemaEvolveException {
        try {
            ArrayList<SchemaChange> tableChangeList = new ArrayList<SchemaChange>();
            event.getTypeMapping().forEach((oldName, newType) -> tableChangeList.add(SchemaChangeProvider.updateColumnType(oldName, newType)));
            this.catalog.alterTable(PaimonMetadataApplier.tableIdToIdentifier((SchemaChangeEvent)event), tableChangeList, true);
        }
        catch (Catalog.ColumnAlreadyExistException | Catalog.ColumnNotExistException | Catalog.TableNotExistException e) {
            throw new SchemaEvolveException((SchemaChangeEvent)event, e.getMessage(), (Throwable)e);
        }
    }

    private void applyTruncateTable(TruncateTableEvent event) throws SchemaEvolveException {
        try {
            Table table = this.catalog.getTable(PaimonMetadataApplier.tableIdToIdentifier((SchemaChangeEvent)event));
            if (table.options().get("deletion-vectors.enabled").equals("true")) {
                throw new UnsupportedSchemaChangeEventException((SchemaChangeEvent)event, "Unable to truncate a table with deletion vectors enabled.", null);
            }
            try (BatchTableCommit batchTableCommit = table.newBatchWriteBuilder().newCommit();){
                batchTableCommit.truncateTable();
            }
        }
        catch (Exception e) {
            throw new SchemaEvolveException((SchemaChangeEvent)event, "Failed to apply truncate table event", (Throwable)e);
        }
    }

    private void applyDropTable(DropTableEvent event) throws SchemaEvolveException {
        try {
            this.catalog.dropTable(PaimonMetadataApplier.tableIdToIdentifier((SchemaChangeEvent)event), true);
        }
        catch (Catalog.TableNotExistException e) {
            throw new SchemaEvolveException((SchemaChangeEvent)event, "Failed to apply drop table event", (Throwable)e);
        }
    }

    private static Identifier tableIdToIdentifier(SchemaChangeEvent event) {
        return new Identifier(event.tableId().getSchemaName(), event.tableId().getTableName());
    }
}

