/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.fileindex.bsi;

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Function;
import org.apache.paimon.data.Decimal;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.fileindex.FileIndexReader;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fileindex.FileIndexWriter;
import org.apache.paimon.fileindex.FileIndexer;
import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeDefaultVisitor;
import org.apache.paimon.types.DateType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.SmallIntType;
import org.apache.paimon.types.TimeType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.TinyIntType;
import org.apache.paimon.utils.BitSliceIndexRoaringBitmap;
import org.apache.paimon.utils.RoaringBitmap32;

public class BitSliceIndexBitmapFileIndex
implements FileIndexer {
    public static final int VERSION_1 = 1;
    private final DataType dataType;

    public BitSliceIndexBitmapFileIndex(DataType dataType, Options options) {
        this.dataType = dataType;
    }

    @Override
    public FileIndexWriter createWriter() {
        return new Writer(this.dataType);
    }

    @Override
    public FileIndexReader createReader(SeekableInputStream inputStream, int start, int length) {
        try {
            inputStream.seek(start);
            DataInputStream input = new DataInputStream(inputStream);
            byte version = input.readByte();
            if (version > 1) {
                throw new RuntimeException(String.format("read bsi index file fail, your plugin version is lower than %d", version));
            }
            int rowNumber = input.readInt();
            boolean hasPositive = input.readBoolean();
            BitSliceIndexRoaringBitmap positive = hasPositive ? BitSliceIndexRoaringBitmap.map(input) : BitSliceIndexRoaringBitmap.EMPTY;
            boolean hasNegative = input.readBoolean();
            BitSliceIndexRoaringBitmap negative = hasNegative ? BitSliceIndexRoaringBitmap.map(input) : BitSliceIndexRoaringBitmap.EMPTY;
            return new Reader(this.dataType, rowNumber, positive, negative);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static Function<Object, Long> getValueMapper(DataType dataType) {
        return dataType.accept(new DataTypeDefaultVisitor<Function<Object, Long>>(){

            @Override
            public Function<Object, Long> visit(DecimalType decimalType) {
                return o -> o == null ? null : Long.valueOf(((Decimal)o).toUnscaledLong());
            }

            @Override
            public Function<Object, Long> visit(TinyIntType tinyIntType) {
                return o -> o == null ? null : Long.valueOf(((Byte)o).longValue());
            }

            @Override
            public Function<Object, Long> visit(SmallIntType smallIntType) {
                return o -> o == null ? null : Long.valueOf(((Short)o).longValue());
            }

            @Override
            public Function<Object, Long> visit(IntType intType) {
                return o -> o == null ? null : Long.valueOf(((Integer)o).longValue());
            }

            @Override
            public Function<Object, Long> visit(BigIntType bigIntType) {
                return o -> o == null ? null : (Long)o;
            }

            @Override
            public Function<Object, Long> visit(DateType dateType) {
                return o -> o == null ? null : Long.valueOf(((Integer)o).longValue());
            }

            @Override
            public Function<Object, Long> visit(TimeType timeType) {
                return o -> o == null ? null : Long.valueOf(((Integer)o).longValue());
            }

            @Override
            public Function<Object, Long> visit(TimestampType timestampType) {
                return this.getTimeStampMapper(timestampType.getPrecision());
            }

            @Override
            public Function<Object, Long> visit(LocalZonedTimestampType localZonedTimestampType) {
                return this.getTimeStampMapper(localZonedTimestampType.getPrecision());
            }

            @Override
            protected Function<Object, Long> defaultMethod(DataType dataType) {
                throw new UnsupportedOperationException(dataType.asSQLString() + " type is not support to build bsi index yet.");
            }

            private Function<Object, Long> getTimeStampMapper(int precision) {
                return o -> {
                    if (o == null) {
                        return null;
                    }
                    if (precision <= 3) {
                        return ((Timestamp)o).getMillisecond();
                    }
                    return ((Timestamp)o).toMicros();
                };
            }
        });
    }

    private static class Reader
    extends FileIndexReader {
        private final int rowNumber;
        private final BitSliceIndexRoaringBitmap positive;
        private final BitSliceIndexRoaringBitmap negative;
        private final Function<Object, Long> valueMapper;

        public Reader(DataType dataType, int rowNumber, BitSliceIndexRoaringBitmap positive, BitSliceIndexRoaringBitmap negative) {
            this.rowNumber = rowNumber;
            this.positive = positive;
            this.negative = negative;
            this.valueMapper = BitSliceIndexBitmapFileIndex.getValueMapper(dataType);
        }

        @Override
        public FileIndexResult visitIsNull(FieldRef fieldRef) {
            return new BitmapIndexResult(() -> {
                RoaringBitmap32 bitmap = RoaringBitmap32.or(this.positive.isNotNull(), this.negative.isNotNull());
                bitmap.flip(0L, this.rowNumber);
                return bitmap;
            });
        }

        @Override
        public FileIndexResult visitIsNotNull(FieldRef fieldRef) {
            return new BitmapIndexResult(() -> RoaringBitmap32.or(this.positive.isNotNull(), this.negative.isNotNull()));
        }

        @Override
        public FileIndexResult visitEqual(FieldRef fieldRef, Object literal) {
            return this.visitIn(fieldRef, (List)Collections.singletonList(literal));
        }

        @Override
        public FileIndexResult visitNotEqual(FieldRef fieldRef, Object literal) {
            return this.visitNotIn(fieldRef, (List)Collections.singletonList(literal));
        }

        @Override
        public FileIndexResult visitIn(FieldRef fieldRef, List<Object> literals) {
            return new BitmapIndexResult(() -> literals.stream().map(this.valueMapper).map(value -> {
                if (value < 0L) {
                    return this.negative.eq(Math.abs(value));
                }
                return this.positive.eq((long)value);
            }).reduce(new RoaringBitmap32(), (x1, x2) -> RoaringBitmap32.or(x1, x2)));
        }

        @Override
        public FileIndexResult visitNotIn(FieldRef fieldRef, List<Object> literals) {
            return new BitmapIndexResult(() -> {
                RoaringBitmap32 ebm = RoaringBitmap32.or(this.positive.isNotNull(), this.negative.isNotNull());
                RoaringBitmap32 eq2 = literals.stream().map(this.valueMapper).map(value -> {
                    if (value < 0L) {
                        return this.negative.eq(Math.abs(value));
                    }
                    return this.positive.eq((long)value);
                }).reduce(new RoaringBitmap32(), (x1, x2) -> RoaringBitmap32.or(x1, x2));
                return RoaringBitmap32.andNot(ebm, eq2);
            });
        }

        @Override
        public FileIndexResult visitLessThan(FieldRef fieldRef, Object literal) {
            return new BitmapIndexResult(() -> {
                Long value = this.valueMapper.apply(literal);
                if (value < 0L) {
                    return this.negative.gt(Math.abs(value));
                }
                return RoaringBitmap32.or(this.positive.lt(value), this.negative.isNotNull());
            });
        }

        @Override
        public FileIndexResult visitLessOrEqual(FieldRef fieldRef, Object literal) {
            return new BitmapIndexResult(() -> {
                Long value = this.valueMapper.apply(literal);
                if (value < 0L) {
                    return this.negative.gte(Math.abs(value));
                }
                return RoaringBitmap32.or(this.positive.lte(value), this.negative.isNotNull());
            });
        }

        @Override
        public FileIndexResult visitGreaterThan(FieldRef fieldRef, Object literal) {
            return new BitmapIndexResult(() -> {
                Long value = this.valueMapper.apply(literal);
                if (value < 0L) {
                    return RoaringBitmap32.or(this.positive.isNotNull(), this.negative.lt(Math.abs(value)));
                }
                return this.positive.gt(value);
            });
        }

        @Override
        public FileIndexResult visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
            return new BitmapIndexResult(() -> {
                Long value = this.valueMapper.apply(literal);
                if (value < 0L) {
                    return RoaringBitmap32.or(this.positive.isNotNull(), this.negative.lte(Math.abs(value)));
                }
                return this.positive.gte(value);
            });
        }
    }

    private static class Writer
    extends FileIndexWriter {
        private final Function<Object, Long> valueMapper;
        private final StatsCollectList collector;

        public Writer(DataType dataType) {
            this.valueMapper = BitSliceIndexBitmapFileIndex.getValueMapper(dataType);
            this.collector = new StatsCollectList();
        }

        @Override
        public void write(Object key) {
            this.collector.add(this.valueMapper.apply(key));
        }

        @Override
        public byte[] serializedBytes() {
            try {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream out = new DataOutputStream(bos);
                BitSliceIndexRoaringBitmap.Appender positive = new BitSliceIndexRoaringBitmap.Appender(this.collector.positiveMin, this.collector.positiveMax);
                BitSliceIndexRoaringBitmap.Appender negative = new BitSliceIndexRoaringBitmap.Appender(this.collector.negativeMin, this.collector.negativeMax);
                for (int i = 0; i < this.collector.values.size(); ++i) {
                    Long value = (Long)this.collector.values.get(i);
                    if (value == null) continue;
                    if (value < 0L) {
                        negative.append(i, Math.abs(value));
                        continue;
                    }
                    positive.append(i, value);
                }
                out.writeByte(1);
                out.writeInt(this.collector.values.size());
                boolean hasPositive = positive.isNotEmpty();
                out.writeBoolean(hasPositive);
                if (hasPositive) {
                    positive.serialize(out);
                }
                boolean hasNegative = negative.isNotEmpty();
                out.writeBoolean(hasNegative);
                if (hasNegative) {
                    negative.serialize(out);
                }
                return bos.toByteArray();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        private static class StatsCollectList {
            private long positiveMin;
            private long positiveMax;
            private long negativeMin;
            private long negativeMax;
            private final List<Long> values = new ArrayList<Long>();

            private StatsCollectList() {
            }

            public void add(Long value) {
                this.values.add(value);
                if (value != null) {
                    this.collect(value);
                }
            }

            private void collect(long value) {
                if (value < 0L) {
                    this.negativeMin = Math.min(this.negativeMin, Math.abs(value));
                    this.negativeMax = Math.max(this.negativeMax, Math.abs(value));
                } else {
                    this.positiveMin = Math.min(this.positiveMin, value);
                    this.positiveMax = Math.max(this.positiveMax, value);
                }
            }
        }
    }
}

