/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.source;

import javax.annotation.Nullable;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.runtime.operators.source.InputConversionOperator;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.FlinkRuntimeException;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.Test;

public class InputConversionOperatorTest {
    @Test
    public void testInvalidRecords() {
        InputConversionOperator operator = new InputConversionOperator(InputConversionOperatorTest.createConverter(DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f", (DataType)DataTypes.INT())})), false, false, false, true);
        Assertions.assertThatThrownBy(() -> operator.processElement(new StreamRecord((Object)Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[0])))).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FlinkRuntimeException.class, (String)"Error during input conversion from external DataStream API to internal Table API data structures")});
        Assertions.assertThatThrownBy(() -> operator.processElement(new StreamRecord((Object)Row.ofKind((RowKind)RowKind.DELETE, (Object[])new Object[]{12})))).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FlinkRuntimeException.class, (String)"Conversion expects insert-only records")});
    }

    @Test
    public void testInvalidEventTime() {
        InputConversionOperator operator = new InputConversionOperator(InputConversionOperatorTest.createConverter(DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f", (DataType)DataTypes.INT())})), false, true, false, true);
        Assertions.assertThatThrownBy(() -> operator.processElement(new StreamRecord((Object)Row.ofKind((RowKind)RowKind.INSERT, (Object[])new Object[]{12})))).satisfies(new ThrowingConsumer[]{FlinkAssertions.anyCauseMatches(FlinkRuntimeException.class, (String)"Could not find timestamp in DataStream API record.")});
    }

    @Test
    public void testWatermarkSuppression() throws Exception {
        InputConversionOperator operator = new InputConversionOperator(InputConversionOperatorTest.createConverter(DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f", (DataType)DataTypes.INT())})), false, false, false, true);
        operator.processWatermark(new Watermark(1000L));
    }

    @Test(expected=NullPointerException.class)
    public void testReceiveMaxWatermark() throws Exception {
        InputConversionOperator operator = new InputConversionOperator(InputConversionOperatorTest.createConverter(DataTypes.ROW((DataTypes.Field[])new DataTypes.Field[]{DataTypes.FIELD((String)"f", (DataType)DataTypes.INT())})), false, false, false, true);
        operator.processWatermark(Watermark.MAX_WATERMARK);
    }

    private static DynamicTableSource.DataStructureConverter createConverter(DataType dataType) {
        final DataStructureConverter converter = DataStructureConverters.getConverter((DataType)dataType);
        return new DynamicTableSource.DataStructureConverter(){

            @Nullable
            public Object toInternal(@Nullable Object externalStructure) {
                return converter.toInternalOrNull(externalStructure);
            }

            public void open(RuntimeConverter.Context context) {
            }
        };
    }
}

