/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.io;

import java.time.Duration;
import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.KeyValue;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.Preconditions;

public class RecordLevelExpire {
    private final int expireTime;
    private final Function<InternalRow, Integer> fieldGetter;

    @Nullable
    public static RecordLevelExpire create(CoreOptions options, RowType rowType) {
        Duration expireTime = options.recordLevelExpireTime();
        if (expireTime == null) {
            return null;
        }
        String timeFieldName = options.recordLevelTimeField();
        if (timeFieldName == null) {
            throw new IllegalArgumentException("You should set time field for record-level expire.");
        }
        int fieldIndex = rowType.getFieldIndex(timeFieldName);
        if (fieldIndex == -1) {
            throw new IllegalArgumentException(String.format("Can not find time field %s for record level expire.", timeFieldName));
        }
        DataType dataType = rowType.getField(timeFieldName).type();
        Function<InternalRow, Integer> fieldGetter = RecordLevelExpire.createFieldGetter(dataType, fieldIndex);
        return new RecordLevelExpire((int)expireTime.getSeconds(), fieldGetter);
    }

    private RecordLevelExpire(int expireTime, Function<InternalRow, Integer> fieldGetter) {
        this.expireTime = expireTime;
        this.fieldGetter = fieldGetter;
    }

    public FileReaderFactory<KeyValue> wrap(FileReaderFactory<KeyValue> readerFactory) {
        return file -> this.wrap(readerFactory.createRecordReader(file));
    }

    private RecordReader<KeyValue> wrap(RecordReader<KeyValue> reader) {
        int currentTime = (int)(System.currentTimeMillis() / 1000L);
        return reader.filter(kv -> currentTime <= this.fieldGetter.apply(kv.value()) + this.expireTime);
    }

    private static Function<InternalRow, Integer> createFieldGetter(DataType dataType, int fieldIndex) {
        Function<InternalRow, Integer> fieldGetter;
        if (dataType instanceof IntType) {
            fieldGetter = row -> row.getInt(fieldIndex);
        } else if (dataType instanceof BigIntType) {
            fieldGetter = row -> {
                long value = row.getLong(fieldIndex);
                return (int)(value >= 1000000000000L ? value / 1000L : value);
            };
        } else if (dataType instanceof TimestampType || dataType instanceof LocalZonedTimestampType) {
            int precision = DataTypeChecks.getPrecision(dataType);
            fieldGetter = row -> (int)(row.getTimestamp(fieldIndex, precision).getMillisecond() / 1000L);
        } else {
            throw new IllegalArgumentException(String.format("The record level time field type should be one of INT, BIGINT, or TIMESTAMP, but field type is %s.", dataType));
        }
        return row -> {
            Preconditions.checkArgument(!row.isNullAt(fieldIndex), "Time field for record-level expire should not be null.");
            return (Integer)fieldGetter.apply((InternalRow)row);
        };
    }
}

