/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.catalog;

import java.util.Arrays;
import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.DataTypeFactory;
import org.apache.flink.table.catalog.DefaultSchemaResolver;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.catalog.WatermarkSpec;
import org.apache.flink.table.expressions.CallExpression;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.expressions.ResolvedExpression;
import org.apache.flink.table.expressions.utils.ResolvedExpressionMock;
import org.apache.flink.table.functions.BuiltInFunctionDefinition;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.types.AbstractDataType;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampKind;
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
import org.apache.flink.table.types.utils.DataTypeFactoryMock;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.table.utils.ExpressionResolverMocks;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Condition;
import org.assertj.core.api.HamcrestCondition;
import org.hamcrest.Matcher;
import org.junit.jupiter.api.Test;

class SchemaResolutionTest {
    private static final String COMPUTED_SQL = "orig_ts - INTERVAL '60' MINUTE";
    private static final ResolvedExpression COMPUTED_COLUMN_RESOLVED = new ResolvedExpressionMock(DataTypes.TIMESTAMP((int)3), () -> "orig_ts - INTERVAL '60' MINUTE");
    private static final String WATERMARK_SQL = "ts - INTERVAL '5' SECOND";
    private static final ResolvedExpression WATERMARK_RESOLVED = new ResolvedExpressionMock(DataTypes.TIMESTAMP((int)3), () -> "ts - INTERVAL '5' SECOND");
    private static final String INVALID_WATERMARK_SQL = "CAST(ts AS TIMESTAMP_LTZ(3)) - INTERVAL '5' SECOND";
    private static final ResolvedExpression INVALID_WATERMARK_RESOLVED = new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ((int)3), () -> "CAST(ts AS TIMESTAMP_LTZ(3)) - INTERVAL '5' SECOND");
    private static final String PROCTIME_SQL = "PROCTIME()";
    private static final ResolvedExpression PROCTIME_RESOLVED = new ResolvedExpressionMock(TypeConversions.fromLogicalToDataType((LogicalType)new LocalZonedTimestampType(false, TimestampKind.PROCTIME, 3)), () -> "PROCTIME()");
    private static final Schema SCHEMA = Schema.newBuilder().primaryKeyNamed("primary_constraint", new String[]{"id"}).column("id", DataTypes.INT().notNull()).withComment("people id").column("counter", DataTypes.INT().notNull()).column("payload", "ROW<name STRING, age INT, flag BOOLEAN>").columnByMetadata("topic", (AbstractDataType)DataTypes.STRING(), true).withComment("kafka topic").columnByExpression("ts", (Expression)Expressions.callSql((String)"orig_ts - INTERVAL '60' MINUTE")).withComment("rowtime").columnByMetadata("orig_ts", (AbstractDataType)DataTypes.TIMESTAMP((int)3), "timestamp").withComment("the 'origin' timestamp").watermark("ts", "ts - INTERVAL '5' SECOND").columnByExpression("proctime", "PROCTIME()").build();
    private static final String COMPUTED_SQL_WITH_TS_LTZ = "ts_ltz - INTERVAL '60' MINUTE";
    private static final ResolvedExpression COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ = new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ((int)3), () -> "ts_ltz - INTERVAL '60' MINUTE");
    private static final String WATERMARK_SQL_WITH_TS_LTZ = "ts1 - INTERVAL '5' SECOND";
    private static final ResolvedExpression WATERMARK_RESOLVED_WITH_TS_LTZ = new ResolvedExpressionMock(DataTypes.TIMESTAMP_LTZ((int)3), () -> "ts1 - INTERVAL '5' SECOND");
    private static final Schema SCHEMA_WITH_TS_LTZ = Schema.newBuilder().column("id", DataTypes.INT().notNull()).columnByExpression("ts1", (Expression)Expressions.callSql((String)"ts_ltz - INTERVAL '60' MINUTE")).columnByMetadata("ts_ltz", (AbstractDataType)DataTypes.TIMESTAMP_LTZ((int)3), "timestamp").watermark("ts1", "ts1 - INTERVAL '5' SECOND").build();

    SchemaResolutionTest() {
    }

    @Test
    void testSchemaResolution() {
        ResolvedSchema expectedSchema = new ResolvedSchema(Arrays.asList(Column.physical((String)"id", (DataType)((DataType)DataTypes.INT().notNull())).withComment("people id"), Column.physical((String)"counter", (DataType)((DataType)DataTypes.INT().notNull())), Column.physical((String)"payload", (DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"age", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"flag", (DataType)DataTypes.BOOLEAN())})), Column.metadata((String)"topic", (DataType)DataTypes.STRING(), null, (boolean)true).withComment("kafka topic"), Column.computed((String)"ts", (ResolvedExpression)COMPUTED_COLUMN_RESOLVED).withComment("rowtime"), Column.metadata((String)"orig_ts", (DataType)DataTypes.TIMESTAMP((int)3), (String)"timestamp", (boolean)false).withComment("the 'origin' timestamp"), Column.computed((String)"proctime", (ResolvedExpression)PROCTIME_RESOLVED)), Collections.singletonList(WatermarkSpec.of((String)"ts", (ResolvedExpression)WATERMARK_RESOLVED)), UniqueConstraint.primaryKey((String)"primary_constraint", Collections.singletonList("id")));
        ResolvedSchema actualStreamSchema = SchemaResolutionTest.resolveSchema(SCHEMA, true);
        Assertions.assertThat((Object)actualStreamSchema).isEqualTo((Object)expectedSchema);
        Assertions.assertThat((boolean)LogicalTypeChecks.isRowtimeAttribute((LogicalType)SchemaResolutionTest.getType(actualStreamSchema, "ts"))).isTrue();
        Assertions.assertThat((boolean)LogicalTypeChecks.isProctimeAttribute((LogicalType)SchemaResolutionTest.getType(actualStreamSchema, "proctime"))).isTrue();
        ResolvedSchema actualBatchSchema = SchemaResolutionTest.resolveSchema(SCHEMA, false);
        Assertions.assertThat((Object)actualBatchSchema).isEqualTo((Object)expectedSchema);
        Assertions.assertThat((boolean)LogicalTypeChecks.isRowtimeAttribute((LogicalType)SchemaResolutionTest.getType(actualBatchSchema, "ts"))).isFalse();
        Assertions.assertThat((boolean)LogicalTypeChecks.isProctimeAttribute((LogicalType)SchemaResolutionTest.getType(actualBatchSchema, "proctime"))).isTrue();
    }

    @Test
    void testSchemaResolutionWithTimestampLtzRowtime() {
        ResolvedSchema expectedSchema = new ResolvedSchema(Arrays.asList(Column.physical((String)"id", (DataType)((DataType)DataTypes.INT().notNull())), Column.computed((String)"ts1", (ResolvedExpression)COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ), Column.metadata((String)"ts_ltz", (DataType)DataTypes.TIMESTAMP_LTZ((int)3), (String)"timestamp", (boolean)false)), Collections.singletonList(WatermarkSpec.of((String)"ts1", (ResolvedExpression)WATERMARK_RESOLVED_WITH_TS_LTZ)), null);
        ResolvedSchema actualStreamSchema = SchemaResolutionTest.resolveSchema(SCHEMA_WITH_TS_LTZ, true);
        Assertions.assertThat((Object)actualStreamSchema).isEqualTo((Object)expectedSchema);
        Assertions.assertThat((boolean)LogicalTypeChecks.isRowtimeAttribute((LogicalType)SchemaResolutionTest.getType(actualStreamSchema, "ts1"))).isTrue();
        ResolvedSchema actualBatchSchema = SchemaResolutionTest.resolveSchema(SCHEMA_WITH_TS_LTZ, false);
        Assertions.assertThat((Object)actualBatchSchema).isEqualTo((Object)expectedSchema);
        Assertions.assertThat((boolean)LogicalTypeChecks.isRowtimeAttribute((LogicalType)SchemaResolutionTest.getType(actualBatchSchema, "ts1"))).isFalse();
    }

    @Test
    void testSchemaResolutionWithSourceWatermark() {
        ResolvedSchema expectedSchema = new ResolvedSchema(Collections.singletonList(Column.physical((String)"ts_ltz", (DataType)DataTypes.TIMESTAMP_LTZ((int)1))), Collections.singletonList(WatermarkSpec.of((String)"ts_ltz", (ResolvedExpression)CallExpression.permanent((BuiltInFunctionDefinition)BuiltInFunctionDefinitions.SOURCE_WATERMARK, Collections.emptyList(), (DataType)DataTypes.TIMESTAMP_LTZ((int)1)))), null);
        ResolvedSchema resolvedSchema = SchemaResolutionTest.resolveSchema(Schema.newBuilder().column("ts_ltz", (AbstractDataType)DataTypes.TIMESTAMP_LTZ((int)1)).watermark("ts_ltz", (Expression)Expressions.sourceWatermark()).build());
        Assertions.assertThat((Object)resolvedSchema).isEqualTo((Object)expectedSchema);
    }

    @Test
    void testSchemaResolutionErrors() {
        SchemaResolutionTest.testError(Schema.newBuilder().fromSchema(SCHEMA).column("id", (AbstractDataType)DataTypes.STRING()).build(), "Schema must not contain duplicate column names.");
        SchemaResolutionTest.testError(Schema.newBuilder().columnByExpression("invalid", (Expression)Expressions.callSql((String)"INVALID")).build(), "Invalid expression for computed column 'invalid'.");
        SchemaResolutionTest.testError(Schema.newBuilder().columnByMetadata("metadata", (AbstractDataType)DataTypes.INT()).columnByMetadata("from_metadata", (AbstractDataType)DataTypes.BIGINT(), "metadata", false).build(), "The column `metadata` and `from_metadata` in the table are both from the same metadata key 'metadata'. Please specify one of the columns as the metadata column and use the computed column syntax to specify the others.");
        SchemaResolutionTest.testError(Schema.newBuilder().columnByMetadata("from_metadata", (AbstractDataType)DataTypes.BIGINT(), "metadata", false).columnByMetadata("from_metadata2", (AbstractDataType)DataTypes.STRING(), "metadata", true).build(), "The column `from_metadata` and `from_metadata2` in the table are both from the same metadata key 'metadata'. Please specify one of the columns as the metadata column and use the computed column syntax to specify the others.");
        SchemaResolutionTest.testError(Schema.newBuilder().column("ts", (AbstractDataType)DataTypes.BOOLEAN()).watermark("ts", (Expression)Expressions.callSql((String)WATERMARK_SQL)).build(), "Invalid data type of time field for watermark definition. The field must be of type TIMESTAMP(p) or TIMESTAMP_LTZ(p), the supported precision 'p' is from 0 to 3, but the time field type is BOOLEAN");
        SchemaResolutionTest.testError(Schema.newBuilder().column("ts", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).watermark("ts", (Expression)Expressions.callSql((String)"INVALID")).build(), "Invalid expression for watermark 'WATERMARK FOR `ts` AS [INVALID]'.");
        SchemaResolutionTest.testError(Schema.newBuilder().column("ts", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).watermark("ts", (Expression)Expressions.callSql((String)INVALID_WATERMARK_SQL)).build(), "The watermark declaration's output data type 'TIMESTAMP_LTZ(3)' is different from the time field's data type 'TIMESTAMP(3)'.");
        SchemaResolutionTest.testError(Schema.newBuilder().column("ts", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).watermark("other_ts", (Expression)Expressions.callSql((String)WATERMARK_SQL)).build(), "Invalid column name 'other_ts' for rowtime attribute in watermark declaration. Available columns are: [ts]");
        SchemaResolutionTest.testError(Schema.newBuilder().fromSchema(SCHEMA).watermark("orig_ts", WATERMARK_SQL).build(), "Multiple watermark definitions are not supported yet.");
        SchemaResolutionTest.testError(Schema.newBuilder().columnByExpression("ts", PROCTIME_SQL).watermark("ts", WATERMARK_SQL).build(), "A watermark can not be defined for a processing-time attribute.");
        SchemaResolutionTest.testError(Schema.newBuilder().column("id", (AbstractDataType)DataTypes.INT()).primaryKey(new String[]{"INVALID"}).build(), "Column 'INVALID' does not exist.");
        SchemaResolutionTest.testError(Schema.newBuilder().column("nullable_col", (AbstractDataType)DataTypes.INT()).primaryKey(new String[]{"nullable_col"}).build(), "Column 'nullable_col' is nullable.");
        SchemaResolutionTest.testError(Schema.newBuilder().column("orig_ts", (AbstractDataType)DataTypes.TIMESTAMP((int)3)).columnByExpression("ts", COMPUTED_SQL).primaryKey(new String[]{"ts"}).build(), "Column 'ts' is not a physical column.");
        SchemaResolutionTest.testError(Schema.newBuilder().column("id", (AbstractDataType)DataTypes.INT()).primaryKey(new String[]{"id", "id"}).build(), "Invalid primary key 'PK_id_id'. A primary key must not contain duplicate columns. Found: [id]");
    }

    @Test
    void testUnresolvedSchemaString() {
        Assertions.assertThat((String)SCHEMA.toString()).isEqualTo("(\n  `id` INT NOT NULL COMMENT 'people id',\n  `counter` INT NOT NULL,\n  `payload` [ROW<name STRING, age INT, flag BOOLEAN>],\n  `topic` METADATA VIRTUAL COMMENT 'kafka topic',\n  `ts` AS [orig_ts - INTERVAL '60' MINUTE] COMMENT 'rowtime',\n  `orig_ts` METADATA FROM 'timestamp' COMMENT 'the ''origin'' timestamp',\n  `proctime` AS [PROCTIME()],\n  WATERMARK FOR `ts` AS [ts - INTERVAL '5' SECOND],\n  CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n)");
    }

    @Test
    void testResolvedSchemaString() {
        ResolvedSchema resolvedSchema = SchemaResolutionTest.resolveSchema(SCHEMA);
        Assertions.assertThat((String)resolvedSchema.toString()).isEqualTo("(\n  `id` INT NOT NULL COMMENT 'people id',\n  `counter` INT NOT NULL,\n  `payload` ROW<`name` STRING, `age` INT, `flag` BOOLEAN>,\n  `topic` STRING METADATA VIRTUAL COMMENT 'kafka topic',\n  `ts` TIMESTAMP(3) *ROWTIME* AS orig_ts - INTERVAL '60' MINUTE COMMENT 'rowtime',\n  `orig_ts` TIMESTAMP(3) METADATA FROM 'timestamp' COMMENT 'the ''origin'' timestamp',\n  `proctime` TIMESTAMP_LTZ(3) NOT NULL *PROCTIME* AS PROCTIME(),\n  WATERMARK FOR `ts`: TIMESTAMP(3) AS ts - INTERVAL '5' SECOND,\n  CONSTRAINT `primary_constraint` PRIMARY KEY (`id`) NOT ENFORCED\n)");
    }

    @Test
    void testGeneratedConstraintName() {
        Schema schema = Schema.newBuilder().column("a", (AbstractDataType)DataTypes.INT()).column("b", (AbstractDataType)DataTypes.STRING()).column("c", (AbstractDataType)DataTypes.STRING()).primaryKey(new String[]{"b", "a"}).build();
        Assertions.assertThat((String)((Schema.UnresolvedPrimaryKey)schema.getPrimaryKey().orElseThrow(IllegalStateException::new)).getConstraintName()).isEqualTo("PK_b_a");
    }

    @Test
    void testSinkRowDataType() {
        ResolvedSchema resolvedSchema = SchemaResolutionTest.resolveSchema(SCHEMA);
        DataType expectedDataType = (DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)((DataType)DataTypes.INT().notNull())), DataTypes.FIELD((String)"counter", (DataType)((DataType)DataTypes.INT().notNull())), DataTypes.FIELD((String)"payload", (DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"age", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"flag", (DataType)DataTypes.BOOLEAN())})), DataTypes.FIELD((String)"orig_ts", (DataType)DataTypes.TIMESTAMP((int)3))}).notNull();
        Assertions.assertThat((Object)resolvedSchema.toSinkRowDataType()).isEqualTo((Object)expectedDataType);
    }

    @Test
    void testPhysicalRowDataType() {
        ResolvedSchema resolvedSchema1 = SchemaResolutionTest.resolveSchema(SCHEMA);
        DataType expectedDataType = (DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)((DataType)DataTypes.INT().notNull())), DataTypes.FIELD((String)"counter", (DataType)((DataType)DataTypes.INT().notNull())), DataTypes.FIELD((String)"payload", (DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"age", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"flag", (DataType)DataTypes.BOOLEAN())}))}).notNull();
        DataType physicalDataType1 = resolvedSchema1.toPhysicalRowDataType();
        Assertions.assertThat((Object)physicalDataType1).isEqualTo((Object)expectedDataType);
        ResolvedSchema resolvedSchema2 = SchemaResolutionTest.resolveSchema(Schema.newBuilder().fromRowDataType(physicalDataType1).build());
        Assertions.assertThat((Object)resolvedSchema2.toPhysicalRowDataType()).isEqualTo((Object)physicalDataType1);
    }

    @Test
    void testSourceRowDataType() {
        ResolvedSchema resolvedSchema = SchemaResolutionTest.resolveSchema(SCHEMA);
        DataType expectedDataType = (DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"id", (DataType)((DataType)DataTypes.INT().notNull())), DataTypes.FIELD((String)"counter", (DataType)((DataType)DataTypes.INT().notNull())), DataTypes.FIELD((String)"payload", (DataType)DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"name", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"age", (DataType)DataTypes.INT()), DataTypes.FIELD((String)"flag", (DataType)DataTypes.BOOLEAN())})), DataTypes.FIELD((String)"topic", (DataType)DataTypes.STRING()), DataTypes.FIELD((String)"ts", (DataType)DataTypes.TIMESTAMP((int)3)), DataTypes.FIELD((String)"orig_ts", (DataType)DataTypes.TIMESTAMP((int)3)), DataTypes.FIELD((String)"proctime", (DataType)((DataType)DataTypes.TIMESTAMP_LTZ((int)3).notNull()))}).notNull();
        DataType sourceRowDataType = resolvedSchema.toSourceRowDataType();
        Assertions.assertThat((Object)sourceRowDataType).isEqualTo((Object)expectedDataType);
        Assertions.assertThat((boolean)LogicalTypeChecks.isTimeAttribute((LogicalType)((DataType)sourceRowDataType.getChildren().get(4)).getLogicalType())).isFalse();
        Assertions.assertThat((boolean)LogicalTypeChecks.isTimeAttribute((LogicalType)((DataType)sourceRowDataType.getChildren().get(6)).getLogicalType())).isFalse();
    }

    private static void testError(Schema schema, String errorMessage) {
        SchemaResolutionTest.testError(schema, errorMessage, true);
    }

    private static void testError(Schema schema, String errorMessage, boolean isStreaming) {
        try {
            SchemaResolutionTest.resolveSchema(schema, isStreaming);
            Assertions.fail((String)("Error message expected: " + errorMessage));
        }
        catch (Throwable t) {
            Assertions.assertThat((Throwable)t).satisfies((Condition)HamcrestCondition.matching((Matcher)FlinkMatchers.containsMessage((String)errorMessage)));
        }
    }

    private static ResolvedSchema resolveSchema(Schema schema) {
        return SchemaResolutionTest.resolveSchema(schema, true);
    }

    private static ResolvedSchema resolveSchema(Schema schema, boolean isStreamingMode) {
        DefaultSchemaResolver resolver = new DefaultSchemaResolver(isStreamingMode, (DataTypeFactory)new DataTypeFactoryMock(), ExpressionResolverMocks.forSqlExpression(SchemaResolutionTest::resolveSqlExpression));
        return resolver.resolve(schema);
    }

    private static ResolvedExpression resolveSqlExpression(String sqlExpression, RowType inputRowType, @Nullable LogicalType outputType) {
        switch (sqlExpression) {
            case "orig_ts - INTERVAL '60' MINUTE": {
                Assertions.assertThat((Object)SchemaResolutionTest.getType(inputRowType, "orig_ts")).isEqualTo((Object)DataTypes.TIMESTAMP((int)3).getLogicalType());
                return COMPUTED_COLUMN_RESOLVED;
            }
            case "ts_ltz - INTERVAL '60' MINUTE": {
                Assertions.assertThat((Object)SchemaResolutionTest.getType(inputRowType, "ts_ltz")).isEqualTo((Object)DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType());
                return COMPUTED_COLUMN_RESOLVED_WITH_TS_LTZ;
            }
            case "ts - INTERVAL '5' SECOND": {
                Assertions.assertThat((Object)SchemaResolutionTest.getType(inputRowType, "ts")).isEqualTo((Object)DataTypes.TIMESTAMP((int)3).getLogicalType());
                return WATERMARK_RESOLVED;
            }
            case "ts1 - INTERVAL '5' SECOND": {
                Assertions.assertThat((Object)SchemaResolutionTest.getType(inputRowType, "ts1")).isEqualTo((Object)DataTypes.TIMESTAMP_LTZ((int)3).getLogicalType());
                return WATERMARK_RESOLVED_WITH_TS_LTZ;
            }
            case "PROCTIME()": {
                return PROCTIME_RESOLVED;
            }
            case "CAST(ts AS TIMESTAMP_LTZ(3)) - INTERVAL '5' SECOND": {
                return INVALID_WATERMARK_RESOLVED;
            }
        }
        throw new UnsupportedOperationException("Unknown SQL expression.");
    }

    private static LogicalType getType(ResolvedSchema resolvedSchema, String column) {
        return ((Column)resolvedSchema.getColumn(column).orElseThrow(IllegalStateException::new)).getDataType().getLogicalType();
    }

    private static LogicalType getType(RowType inputRowType, String field) {
        int pos = inputRowType.getFieldIndex(field);
        return inputRowType.getTypeAt(pos);
    }
}

