/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.format.parquet;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.parquet.ParquetInputFile;
import org.apache.paimon.format.parquet.ParquetSchemaConverter;
import org.apache.paimon.format.parquet.reader.ColumnReader;
import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
import org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil;
import org.apache.paimon.format.parquet.reader.ParquetTimestampVector;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.shade.org.apache.parquet.ParquetReadOptions;
import org.apache.paimon.shade.org.apache.parquet.column.ColumnDescriptor;
import org.apache.paimon.shade.org.apache.parquet.column.page.PageReadStore;
import org.apache.paimon.shade.org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.paimon.shade.org.apache.parquet.schema.GroupType;
import org.apache.paimon.shade.org.apache.parquet.schema.MessageType;
import org.apache.paimon.shade.org.apache.parquet.schema.Type;
import org.apache.paimon.shade.org.apache.parquet.schema.Types;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ParquetReaderFactory
implements FormatReaderFactory {
    private static final Logger LOG = LoggerFactory.getLogger(ParquetReaderFactory.class);
    private static final long serialVersionUID = 1L;
    private static final String ALLOCATION_SIZE = "parquet.read.allocation.size";
    private final Options conf;
    private final String[] projectedFields;
    private final DataType[] projectedTypes;
    private final int batchSize;
    private final Set<Integer> unknownFieldsIndices = new HashSet<Integer>();

    public ParquetReaderFactory(Options conf, RowType projectedType, int batchSize) {
        this.conf = conf;
        this.projectedFields = projectedType.getFieldNames().toArray(new String[0]);
        this.projectedTypes = projectedType.getFieldTypes().toArray(new DataType[0]);
        this.batchSize = batchSize;
    }

    public ParquetReader createReader(FormatReaderFactory.Context context) throws IOException {
        ParquetReadOptions.Builder builder = ParquetReadOptions.builder().withRange(0L, context.fileSize());
        this.setReadOptions(builder);
        ParquetFileReader reader = new ParquetFileReader(ParquetInputFile.fromPath(context.fileIO(), context.filePath()), builder.build());
        MessageType fileSchema = reader.getFileMetaData().getSchema();
        MessageType requestedSchema = this.clipParquetSchema(fileSchema);
        reader.setRequestedSchema(requestedSchema);
        this.checkSchema(fileSchema, requestedSchema);
        Pool<ParquetReaderBatch> poolOfBatches = this.createPoolOfBatches(context.filePath(), requestedSchema);
        return new ParquetReader(reader, requestedSchema, reader.getRecordCount(), poolOfBatches);
    }

    private void setReadOptions(ParquetReadOptions.Builder builder) {
        builder.useSignedStringMinMax(this.conf.getBoolean("parquet.strings.signed-min-max.enabled", false));
        builder.useDictionaryFilter(this.conf.getBoolean("parquet.filter.dictionary.enabled", true));
        builder.useStatsFilter(this.conf.getBoolean("parquet.filter.stats.enabled", true));
        builder.useRecordFilter(this.conf.getBoolean("parquet.filter.record-level.enabled", true));
        builder.useColumnIndexFilter(this.conf.getBoolean("parquet.filter.columnindex.enabled", true));
        builder.usePageChecksumVerification(this.conf.getBoolean("parquet.page.verify-checksum.enabled", false));
        builder.useBloomFilter(this.conf.getBoolean("parquet.filter.bloom.enabled", true));
        builder.withMaxAllocationInBytes(this.conf.getInteger(ALLOCATION_SIZE, 0x800000));
        String badRecordThresh = this.conf.getString("parquet.read.bad.record.threshold", null);
        if (badRecordThresh != null) {
            builder.set("parquet.read.bad.record.threshold", badRecordThresh);
        }
    }

    private MessageType clipParquetSchema(GroupType parquetSchema) {
        Type[] types = new Type[this.projectedFields.length];
        for (int i = 0; i < this.projectedFields.length; ++i) {
            String fieldName = this.projectedFields[i];
            if (!parquetSchema.containsField(fieldName)) {
                LOG.warn("{} does not exist in {}, will fill the field with null.", (Object)fieldName, (Object)parquetSchema);
                types[i] = ParquetSchemaConverter.convertToParquetType(fieldName, this.projectedTypes[i]);
                this.unknownFieldsIndices.add(i);
                continue;
            }
            types[i] = parquetSchema.getType(fieldName);
        }
        return (MessageType)((Types.GroupBuilder)Types.buildMessage().addFields(types)).named("paimon-parquet");
    }

    private void checkSchema(MessageType fileSchema, MessageType requestedSchema) throws IOException, UnsupportedOperationException {
        if (this.projectedFields.length != requestedSchema.getFieldCount()) {
            throw new RuntimeException("The quality of field type is incompatible with the request schema!");
        }
        for (int i = 0; i < requestedSchema.getFieldCount(); ++i) {
            Object[] colPath = requestedSchema.getPaths().get(i);
            if (fileSchema.containsPath((String[])colPath)) {
                ColumnDescriptor fd = fileSchema.getColumnDescription((String[])colPath);
                if (fd.equals(requestedSchema.getColumns().get(i))) continue;
                throw new UnsupportedOperationException("Schema evolution not supported.");
            }
            if (requestedSchema.getColumns().get(i).getMaxDefinitionLevel() != 0) continue;
            throw new IOException("Required column is missing in data file. Col: " + Arrays.toString(colPath));
        }
    }

    private Pool<ParquetReaderBatch> createPoolOfBatches(Path filePath, MessageType requestedSchema) {
        Pool<ParquetReaderBatch> pool = new Pool<ParquetReaderBatch>(1);
        pool.add(this.createReaderBatch(filePath, requestedSchema, pool.recycler()));
        return pool;
    }

    private ParquetReaderBatch createReaderBatch(Path filePath, MessageType requestedSchema, Pool.Recycler<ParquetReaderBatch> recycler) {
        WritableColumnVector[] writableVectors = this.createWritableVectors(requestedSchema);
        VectorizedColumnBatch columnarBatch = this.createVectorizedColumnBatch(writableVectors);
        return this.createReaderBatch(filePath, writableVectors, columnarBatch, recycler);
    }

    private WritableColumnVector[] createWritableVectors(MessageType requestedSchema) {
        WritableColumnVector[] columns = new WritableColumnVector[this.projectedTypes.length];
        List<Type> types = requestedSchema.getFields();
        for (int i = 0; i < this.projectedTypes.length; ++i) {
            columns[i] = ParquetSplitReaderUtil.createWritableColumnVector(this.batchSize, this.projectedTypes[i], types.get(i), requestedSchema.getColumns(), 0);
        }
        return columns;
    }

    private VectorizedColumnBatch createVectorizedColumnBatch(WritableColumnVector[] writableVectors) {
        ColumnVector[] vectors = new ColumnVector[writableVectors.length];
        block4: for (int i = 0; i < writableVectors.length; ++i) {
            switch (this.projectedTypes[i].getTypeRoot()) {
                case DECIMAL: {
                    vectors[i] = new ParquetDecimalVector(writableVectors[i]);
                    continue block4;
                }
                case TIMESTAMP_WITHOUT_TIME_ZONE: 
                case TIMESTAMP_WITH_LOCAL_TIME_ZONE: {
                    vectors[i] = new ParquetTimestampVector(writableVectors[i]);
                    continue block4;
                }
                default: {
                    vectors[i] = writableVectors[i];
                }
            }
        }
        return new VectorizedColumnBatch(vectors);
    }

    private ParquetReaderBatch createReaderBatch(Path filePath, WritableColumnVector[] writableVectors, VectorizedColumnBatch columnarBatch, Pool.Recycler<ParquetReaderBatch> recycler) {
        return new ParquetReaderBatch(filePath, writableVectors, columnarBatch, recycler);
    }

    private static class ParquetReaderBatch {
        private final WritableColumnVector[] writableVectors;
        protected final VectorizedColumnBatch columnarBatch;
        private final Pool.Recycler<ParquetReaderBatch> recycler;
        private final ColumnarRowIterator result;

        protected ParquetReaderBatch(Path filePath, WritableColumnVector[] writableVectors, VectorizedColumnBatch columnarBatch, Pool.Recycler<ParquetReaderBatch> recycler) {
            this.writableVectors = writableVectors;
            this.columnarBatch = columnarBatch;
            this.recycler = recycler;
            this.result = new ColumnarRowIterator(filePath, new ColumnarRow(columnarBatch), this::recycle);
        }

        public void recycle() {
            this.recycler.recycle(this);
        }

        public RecordReader.RecordIterator<InternalRow> convertAndGetIterator(long rowNumber) {
            this.result.reset(rowNumber);
            return this.result;
        }
    }

    private class ParquetReader
    implements RecordReader<InternalRow> {
        private ParquetFileReader reader;
        private final MessageType requestedSchema;
        private final long totalRowCount;
        private final Pool<ParquetReaderBatch> pool;
        private long rowsReturned;
        private long totalCountLoadedSoFar;
        private long currentRowPosition;
        private ColumnReader[] columnReaders;

        private ParquetReader(ParquetFileReader reader, MessageType requestedSchema, long totalRowCount, Pool<ParquetReaderBatch> pool) {
            this.reader = reader;
            this.requestedSchema = requestedSchema;
            this.totalRowCount = totalRowCount;
            this.pool = pool;
            this.rowsReturned = 0L;
            this.totalCountLoadedSoFar = 0L;
            this.currentRowPosition = 0L;
        }

        @Override
        @Nullable
        public RecordReader.RecordIterator<InternalRow> readBatch() throws IOException {
            ParquetReaderBatch batch = this.getCachedEntry();
            long rowNumber = this.currentRowPosition;
            if (!this.nextBatch(batch)) {
                batch.recycle();
                return null;
            }
            return batch.convertAndGetIterator(rowNumber);
        }

        private boolean nextBatch(ParquetReaderBatch batch) throws IOException {
            for (WritableColumnVector v : batch.writableVectors) {
                v.reset();
            }
            batch.columnarBatch.setNumRows(0);
            if (this.rowsReturned >= this.totalRowCount) {
                return false;
            }
            if (this.rowsReturned == this.totalCountLoadedSoFar) {
                this.readNextRowGroup();
            }
            int num = (int)Math.min((long)ParquetReaderFactory.this.batchSize, this.totalCountLoadedSoFar - this.rowsReturned);
            for (int i = 0; i < this.columnReaders.length; ++i) {
                if (this.columnReaders[i] == null) {
                    batch.writableVectors[i].fillWithNulls();
                    continue;
                }
                this.columnReaders[i].readToVector(num, batch.writableVectors[i]);
            }
            this.rowsReturned += (long)num;
            this.currentRowPosition += (long)num;
            batch.columnarBatch.setNumRows(num);
            return true;
        }

        private void readNextRowGroup() throws IOException {
            PageReadStore pages = this.reader.readNextRowGroup();
            if (pages == null) {
                throw new IOException("expecting more rows but reached last block. Read " + this.rowsReturned + " out of " + this.totalRowCount);
            }
            List<Type> types = this.requestedSchema.getFields();
            this.columnReaders = new ColumnReader[types.size()];
            for (int i = 0; i < types.size(); ++i) {
                if (ParquetReaderFactory.this.unknownFieldsIndices.contains(i)) continue;
                this.columnReaders[i] = ParquetSplitReaderUtil.createColumnReader(ParquetReaderFactory.this.projectedTypes[i], types.get(i), this.requestedSchema.getColumns(), pages, 0);
            }
            this.totalCountLoadedSoFar += pages.getRowCount();
        }

        private ParquetReaderBatch getCachedEntry() throws IOException {
            try {
                return this.pool.pollEntry();
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException("Interrupted");
            }
        }

        @Override
        public void close() throws IOException {
            if (this.reader != null) {
                this.reader.close();
                this.reader = null;
            }
        }
    }
}

