/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.schema;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SystemColumns;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeRoot;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

public class SchemaValidation {
    public static final List<Class<? extends DataType>> PRIMARY_KEY_UNSUPPORTED_LOGICAL_TYPES = Arrays.asList(MapType.class, ArrayType.class, RowType.class, MultisetType.class);

    public static void validateTableSchema(TableSchema schema) {
        SchemaValidation.validateOnlyContainPrimitiveType(schema.fields(), schema.primaryKeys(), "primary key");
        SchemaValidation.validateOnlyContainPrimitiveType(schema.fields(), schema.partitionKeys(), "partition");
        CoreOptions options = new CoreOptions(schema.options());
        SchemaValidation.validateBucket(schema, options);
        SchemaValidation.validateDefaultValues(schema);
        SchemaValidation.validateStartupMode(options);
        SchemaValidation.validateFieldsPrefix(schema, options);
        SchemaValidation.validateSequenceField(schema, options);
        SchemaValidation.validateSequenceGroup(schema, options);
        CoreOptions.ChangelogProducer changelogProducer = options.changelogProducer();
        if (schema.primaryKeys().isEmpty() && changelogProducer != CoreOptions.ChangelogProducer.NONE) {
            throw new UnsupportedOperationException(String.format("Can not set %s on table without primary keys, please define primary keys.", CoreOptions.CHANGELOG_PRODUCER.key()));
        }
        if (options.streamingReadOverwrite() && (changelogProducer == CoreOptions.ChangelogProducer.FULL_COMPACTION || changelogProducer == CoreOptions.ChangelogProducer.LOOKUP)) {
            throw new UnsupportedOperationException(String.format("Cannot set %s to true when changelog producer is %s or %s because it will read duplicated changes.", CoreOptions.STREAMING_READ_OVERWRITE.key(), CoreOptions.ChangelogProducer.FULL_COMPACTION, CoreOptions.ChangelogProducer.LOOKUP));
        }
        Preconditions.checkArgument(options.snapshotNumRetainMin() > 0, CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1");
        Preconditions.checkArgument(options.snapshotNumRetainMin() <= options.snapshotNumRetainMax(), CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key() + " should not be larger than " + CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key());
        Preconditions.checkArgument(options.changelogNumRetainMin() > 0, CoreOptions.CHANGELOG_NUM_RETAINED_MIN.key() + " should be at least 1");
        Preconditions.checkArgument(options.changelogNumRetainMin() <= options.changelogNumRetainMax(), CoreOptions.CHANGELOG_NUM_RETAINED_MIN.key() + " should not be larger than " + CoreOptions.CHANGELOG_NUM_RETAINED_MAX.key());
        FileFormat fileFormat = FileFormat.fromIdentifier(options.formatType(), new Options(schema.options()));
        fileFormat.validateDataFields(new RowType(schema.fields()));
        schema.fieldNames().forEach(f -> {
            Preconditions.checkState(!SystemColumns.SYSTEM_FIELD_NAMES.contains(f), String.format("Field name[%s] in schema cannot be exist in %s", f, SystemColumns.SYSTEM_FIELD_NAMES));
            Preconditions.checkState(!f.startsWith("_KEY_"), String.format("Field name[%s] in schema cannot start with [%s]", f, "_KEY_"));
        });
        if (schema.primaryKeys().isEmpty() && options.streamingReadOverwrite()) {
            throw new RuntimeException("Doesn't support streaming read the changes from overwrite when the primary keys are not defined.");
        }
        if (schema.options().containsKey(CoreOptions.PARTITION_EXPIRATION_TIME.key()) && schema.partitionKeys().isEmpty()) {
            throw new IllegalArgumentException("Can not set 'partition.expiration-time' for non-partitioned table.");
        }
        if (options.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW && options.changelogProducer() != CoreOptions.ChangelogProducer.LOOKUP && options.changelogProducer() != CoreOptions.ChangelogProducer.NONE) {
            throw new IllegalArgumentException("Only support 'none' and 'lookup' changelog-producer on FIRST_MERGE merge engine");
        }
        options.rowkindField().ifPresent(field -> Preconditions.checkArgument(schema.fieldNames().contains(field), "Rowkind field: '%s' can not be found in table schema.", field));
        if (options.deletionVectorsEnabled()) {
            SchemaValidation.validateForDeletionVectors(options);
        }
    }

    public static void validateFallbackBranch(SchemaManager schemaManager, TableSchema schema) {
        String fallbackBranch = schema.options().get(CoreOptions.SCAN_FALLBACK_BRANCH.key());
        if (!StringUtils.isNullOrWhitespaceOnly(fallbackBranch)) {
            Preconditions.checkArgument(schemaManager.copyWithBranch(fallbackBranch).latest().isPresent(), "Cannot set '%s' = '%s' because the branch '%s' isn't existed.", CoreOptions.SCAN_FALLBACK_BRANCH.key(), fallbackBranch, fallbackBranch);
        }
    }

    private static void validateOnlyContainPrimitiveType(List<DataField> fields, List<String> fieldNames, String errorMessageIntro) {
        if (!fieldNames.isEmpty()) {
            HashMap<String, DataField> rowFields = new HashMap<String, DataField>();
            for (DataField rowField : fields) {
                rowFields.put(rowField.name(), rowField);
            }
            for (String fieldName : fieldNames) {
                DataField rowField = (DataField)rowFields.get(fieldName);
                DataType dataType = rowField.type();
                if (!PRIMARY_KEY_UNSUPPORTED_LOGICAL_TYPES.stream().anyMatch(c -> c.isInstance(dataType))) continue;
                throw new UnsupportedOperationException(String.format("The type %s in %s field %s is unsupported", dataType.getClass().getSimpleName(), errorMessageIntro, fieldName));
            }
        }
    }

    private static void validateStartupMode(CoreOptions options) {
        if (options.startupMode() == CoreOptions.StartupMode.FROM_TIMESTAMP) {
            SchemaValidation.checkExactOneOptionExistInMode(options, options.startupMode(), CoreOptions.SCAN_TIMESTAMP_MILLIS, CoreOptions.SCAN_TIMESTAMP);
            SchemaValidation.checkOptionsConflict(options, Arrays.asList(CoreOptions.SCAN_SNAPSHOT_ID, CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, CoreOptions.SCAN_TAG_NAME, CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP, CoreOptions.INCREMENTAL_BETWEEN), Arrays.asList(CoreOptions.SCAN_TIMESTAMP_MILLIS, CoreOptions.SCAN_TIMESTAMP));
        } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT) {
            SchemaValidation.checkExactOneOptionExistInMode(options, options.startupMode(), CoreOptions.SCAN_SNAPSHOT_ID, CoreOptions.SCAN_TAG_NAME, CoreOptions.SCAN_WATERMARK);
            SchemaValidation.checkOptionsConflict(options, Arrays.asList(CoreOptions.SCAN_TIMESTAMP_MILLIS, CoreOptions.SCAN_TIMESTAMP, CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP, CoreOptions.INCREMENTAL_BETWEEN), Arrays.asList(CoreOptions.SCAN_SNAPSHOT_ID, CoreOptions.SCAN_TAG_NAME));
        } else if (options.startupMode() == CoreOptions.StartupMode.INCREMENTAL) {
            SchemaValidation.checkExactOneOptionExistInMode(options, options.startupMode(), CoreOptions.INCREMENTAL_BETWEEN, CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP);
            SchemaValidation.checkOptionsConflict(options, Arrays.asList(CoreOptions.SCAN_SNAPSHOT_ID, CoreOptions.SCAN_TIMESTAMP_MILLIS, CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, CoreOptions.SCAN_TIMESTAMP, CoreOptions.SCAN_TAG_NAME), Arrays.asList(CoreOptions.INCREMENTAL_BETWEEN, CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP));
        } else if (options.startupMode() == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
            SchemaValidation.checkOptionExistInMode(options, CoreOptions.SCAN_SNAPSHOT_ID, options.startupMode());
            SchemaValidation.checkOptionsConflict(options, Arrays.asList(CoreOptions.SCAN_TIMESTAMP_MILLIS, CoreOptions.SCAN_TIMESTAMP, CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, CoreOptions.SCAN_TAG_NAME, CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP, CoreOptions.INCREMENTAL_BETWEEN), Collections.singletonList(CoreOptions.SCAN_SNAPSHOT_ID));
        } else if (options.startupMode() == CoreOptions.StartupMode.FROM_FILE_CREATION_TIME) {
            SchemaValidation.checkOptionExistInMode(options, CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, CoreOptions.StartupMode.FROM_FILE_CREATION_TIME);
            SchemaValidation.checkOptionsConflict(options, Arrays.asList(CoreOptions.SCAN_SNAPSHOT_ID, CoreOptions.SCAN_TIMESTAMP_MILLIS, CoreOptions.SCAN_TAG_NAME, CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP, CoreOptions.INCREMENTAL_BETWEEN), Collections.singletonList(CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS));
        } else {
            SchemaValidation.checkOptionNotExistInMode(options, CoreOptions.SCAN_TIMESTAMP_MILLIS, options.startupMode());
            SchemaValidation.checkOptionNotExistInMode(options, CoreOptions.SCAN_FILE_CREATION_TIME_MILLIS, options.startupMode());
            SchemaValidation.checkOptionNotExistInMode(options, CoreOptions.SCAN_TIMESTAMP, options.startupMode());
            SchemaValidation.checkOptionNotExistInMode(options, CoreOptions.SCAN_SNAPSHOT_ID, options.startupMode());
            SchemaValidation.checkOptionNotExistInMode(options, CoreOptions.SCAN_TAG_NAME, options.startupMode());
            SchemaValidation.checkOptionNotExistInMode(options, CoreOptions.INCREMENTAL_BETWEEN_TIMESTAMP, options.startupMode());
            SchemaValidation.checkOptionNotExistInMode(options, CoreOptions.INCREMENTAL_BETWEEN, options.startupMode());
        }
    }

    private static void checkOptionExistInMode(CoreOptions options, ConfigOption<?> option, CoreOptions.StartupMode startupMode) {
        Preconditions.checkArgument(options.toConfiguration().contains(option), String.format("%s can not be null when you use %s for %s", option.key(), startupMode, CoreOptions.SCAN_MODE.key()));
    }

    private static void checkOptionNotExistInMode(CoreOptions options, ConfigOption<?> option, CoreOptions.StartupMode startupMode) {
        Preconditions.checkArgument(!options.toConfiguration().contains(option), String.format("%s must be null when you use %s for %s", option.key(), startupMode, CoreOptions.SCAN_MODE.key()));
    }

    private static void checkExactOneOptionExistInMode(CoreOptions options, CoreOptions.StartupMode startupMode, ConfigOption<?> ... configOptions) {
        Preconditions.checkArgument(Arrays.stream(configOptions).filter(op -> options.toConfiguration().contains((ConfigOption<?>)op)).count() == 1L, String.format("must set only one key in [%s] when you use %s for %s", SchemaValidation.concatConfigKeys(Arrays.asList(configOptions)), startupMode, CoreOptions.SCAN_MODE.key()));
    }

    private static void checkOptionsConflict(CoreOptions options, List<ConfigOption<?>> illegalOptions, List<ConfigOption<?>> legalOptions) {
        for (ConfigOption<?> illegalOption : illegalOptions) {
            Preconditions.checkArgument(!options.toConfiguration().contains(illegalOption), "[%s] must be null when you set [%s]", illegalOption.key(), SchemaValidation.concatConfigKeys(legalOptions));
        }
    }

    private static String concatConfigKeys(List<ConfigOption<?>> configOptions) {
        return configOptions.stream().map(ConfigOption::key).collect(Collectors.joining(","));
    }

    private static void validateFieldsPrefix(TableSchema schema, CoreOptions options) {
        List<String> fieldNames = schema.fieldNames();
        options.toMap().keySet().forEach(k -> {
            if (k.startsWith("fields")) {
                String[] fields;
                for (String field : fields = k.split("\\.")[1].split(",")) {
                    Preconditions.checkArgument("default-aggregate-function".equals(field) || fieldNames.contains(field), String.format("Field %s can not be found in table schema.", field));
                }
            }
        });
    }

    private static void validateSequenceGroup(TableSchema schema, CoreOptions options) {
        HashMap<String, Set> fields2Group = new HashMap<String, Set>();
        for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
            String k = entry.getKey();
            String v = entry.getValue();
            List<String> fieldNames = schema.fieldNames();
            if (!k.startsWith("fields") || !k.endsWith("sequence-group")) continue;
            String[] sequenceFieldNames = k.substring("fields".length() + 1, k.length() - "sequence-group".length() - 1).split(",");
            for (String field : v.split(",")) {
                if (!fieldNames.contains(field)) {
                    throw new IllegalArgumentException(String.format("Field %s can not be found in table schema.", field));
                }
                ArrayList<String> sequenceFieldsList = new ArrayList<String>();
                for (String sequenceFieldName : sequenceFieldNames) {
                    if (!fieldNames.contains(sequenceFieldName)) {
                        throw new IllegalArgumentException(String.format("The sequence field group: %s can not be found in table schema.", sequenceFieldName));
                    }
                    sequenceFieldsList.add(sequenceFieldName);
                }
                if (fields2Group.containsKey(field)) {
                    ArrayList sequenceGroups = new ArrayList();
                    sequenceGroups.add(new ArrayList((Collection)fields2Group.get(field)));
                    sequenceGroups.add(sequenceFieldsList);
                    throw new IllegalArgumentException(String.format("Field %s is defined repeatedly by multiple groups: %s.", field, sequenceGroups));
                }
                Set group = fields2Group.computeIfAbsent(field, p -> new HashSet());
                group.addAll(sequenceFieldsList);
            }
        }
        Set illegalGroup = fields2Group.values().stream().flatMap(Collection::stream).filter(g -> options.fieldAggFunc((String)g) != null).collect(Collectors.toSet());
        if (!illegalGroup.isEmpty()) {
            throw new IllegalArgumentException("Should not defined aggregation function on sequence group: " + illegalGroup);
        }
    }

    private static void validateDefaultValues(TableSchema schema) {
        CoreOptions coreOptions = new CoreOptions(schema.options());
        Map<String, String> defaultValues = coreOptions.getFieldDefaultValues();
        if (!defaultValues.isEmpty()) {
            List<String> partitionKeys = schema.partitionKeys();
            for (String string : partitionKeys) {
                if (!defaultValues.containsKey(string)) continue;
                throw new IllegalArgumentException(String.format("Partition key %s should not be assign default column.", string));
            }
            List<String> primaryKeys = schema.primaryKeys();
            for (String primaryKey : primaryKeys) {
                if (!defaultValues.containsKey(primaryKey)) continue;
                throw new IllegalArgumentException(String.format("Primary key %s should not be assign default column.", primaryKey));
            }
            List<DataField> list = schema.fields();
            for (DataField field : list) {
                String defaultValueStr = defaultValues.get(field.name());
                if (defaultValueStr == null) continue;
                CastExecutor<?, ?> resolve = CastExecutors.resolve(VarCharType.STRING_TYPE, field.type());
                if (resolve == null) {
                    throw new IllegalArgumentException(String.format("The column %s with datatype %s is currently not supported for default value.", field.name(), field.type().asSQLString()));
                }
                try {
                    resolve.cast(BinaryString.fromString(defaultValueStr));
                }
                catch (Exception e) {
                    throw new IllegalArgumentException(String.format("The default value %s of the column %s can not be cast to datatype: %s", defaultValueStr, field.name(), field.type()), e);
                }
            }
        }
    }

    private static void validateForDeletionVectors(CoreOptions options) {
        Preconditions.checkArgument(options.changelogProducer() == CoreOptions.ChangelogProducer.NONE || options.changelogProducer() == CoreOptions.ChangelogProducer.INPUT || options.changelogProducer() == CoreOptions.ChangelogProducer.LOOKUP, "Deletion vectors mode is only supported for NONE/INPUT/LOOKUP changelog producer now.");
        Preconditions.checkArgument(!options.mergeEngine().equals(CoreOptions.MergeEngine.FIRST_ROW), "First row merge engine does not need deletion vectors because there is no deletion of old data in this merge engine.");
    }

    private static void validateSequenceField(TableSchema schema, CoreOptions options) {
        List<String> sequenceField = options.sequenceField();
        if (sequenceField.size() > 0) {
            Map<String, Integer> fieldCount = sequenceField.stream().collect(Collectors.toMap(field -> field, field -> 1, Integer::sum));
            sequenceField.forEach(field -> {
                Preconditions.checkArgument(schema.fieldNames().contains(field), "Sequence field: '%s' can not be found in table schema.", field);
                Preconditions.checkArgument(options.fieldAggFunc((String)field) == null, "Should not define aggregation on sequence field: '%s'.", field);
                Preconditions.checkArgument((Integer)fieldCount.get(field) == 1, "Sequence field '%s' is defined repeatedly.", field);
            });
            if (options.mergeEngine() == CoreOptions.MergeEngine.FIRST_ROW) {
                throw new IllegalArgumentException("Do not support use sequence field on FIRST_MERGE merge engine.");
            }
            if (schema.crossPartitionUpdate()) {
                throw new IllegalArgumentException(String.format("You can not use sequence.field in cross partition update case (Primary key constraint '%s' not include all partition fields '%s').", schema.primaryKeys(), schema.partitionKeys()));
            }
        }
    }

    private static void validateBucket(TableSchema schema, CoreOptions options) {
        int bucket = options.bucket();
        if (bucket == -1) {
            if (options.toMap().get(CoreOptions.BUCKET_KEY.key()) != null) {
                throw new RuntimeException("Cannot define 'bucket-key' with bucket -1, please specify a bucket number.");
            }
            if (schema.primaryKeys().isEmpty() && options.toMap().get(CoreOptions.FULL_COMPACTION_DELTA_COMMITS.key()) != null) {
                throw new RuntimeException("AppendOnlyTable of unware or dynamic bucket does not support 'full-compaction.delta-commits'");
            }
        } else {
            if (bucket < 1) {
                throw new RuntimeException("The number of buckets needs to be greater than 0.");
            }
            if (schema.crossPartitionUpdate()) {
                throw new IllegalArgumentException(String.format("You should use dynamic bucket (bucket = -1) mode in cross partition update case (Primary key constraint %s not include all partition fields %s).", schema.primaryKeys(), schema.partitionKeys()));
            }
            if (schema.primaryKeys().isEmpty() && schema.bucketKeys().isEmpty()) {
                throw new RuntimeException("You should define a 'bucket-key' for bucketed append mode.");
            }
            if (!schema.bucketKeys().isEmpty()) {
                List<String> bucketKeys = schema.bucketKeys();
                List nestedFields = schema.fields().stream().filter(dataField -> bucketKeys.contains(dataField.name()) && (dataField.type().getTypeRoot() == DataTypeRoot.ARRAY || dataField.type().getTypeRoot() == DataTypeRoot.MULTISET || dataField.type().getTypeRoot() == DataTypeRoot.MAP || dataField.type().getTypeRoot() == DataTypeRoot.ROW)).map(dataField -> dataField.name()).collect(Collectors.toList());
                if (nestedFields.size() > 0) {
                    throw new RuntimeException("nested type can not in bucket-key, in your table these key are " + nestedFields.toString());
                }
            }
        }
    }
}

