package com.starrocks.connector.flink.row.source;

import com.starrocks.connector.flink.table.source.struct.ColumnRichInfo;
import com.starrocks.connector.flink.table.source.struct.SelectColumn;
import com.starrocks.shade.org.apache.arrow.memory.RootAllocator;
import com.starrocks.shade.org.apache.arrow.vector.FieldVector;
import com.starrocks.shade.org.apache.arrow.vector.VectorSchemaRoot;
import com.starrocks.shade.org.apache.arrow.vector.ipc.ArrowStreamReader;
import com.starrocks.shade.org.apache.arrow.vector.types.pojo.Schema;
import com.starrocks.thrift.TScanBatchResult;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.table.data.GenericRowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/starrocks/connector/flink/row/source/StarRocksSourceFlinkRows.class */
public class StarRocksSourceFlinkRows {
    private static final Logger LOG = LoggerFactory.getLogger(StarRocksSourceFlinkRows.class);
    private int flinkRowsCount;
    private final ArrowStreamReader arrowStreamReader;
    private VectorSchemaRoot root;
    private final List<ColumnRichInfo> columnRichInfos;
    private final SelectColumn[] selectedColumns;
    private final List<GenericRowData> sourceFlinkRows = new ArrayList();
    private final RootAllocator rootAllocator = new RootAllocator(2147483647L);
    private int offsetOfBatchForRead = 0;

    public StarRocksSourceFlinkRows(TScanBatchResult tScanBatchResult, List<ColumnRichInfo> list, SelectColumn[] selectColumnArr) {
        this.columnRichInfos = list;
        this.selectedColumns = selectColumnArr;
        this.arrowStreamReader = new ArrowStreamReader(new ByteArrayInputStream(tScanBatchResult.getRows()), this.rootAllocator);
    }

    public void init(List<ArrowFieldConverter> list) throws IOException {
        this.root = this.arrowStreamReader.getVectorSchemaRoot();
        initFiledConverters(list);
        while (this.arrowStreamReader.loadNextBatch()) {
            List<FieldVector> fieldVectors = this.root.getFieldVectors();
            if (fieldVectors.size() != 0 && this.root.getRowCount() != 0) {
                int rowCount = this.root.getRowCount();
                for (int i = 0; i < rowCount; i++) {
                    this.sourceFlinkRows.add(new GenericRowData(this.selectedColumns.length));
                }
                for (int i2 = 0; i2 < fieldVectors.size(); i2++) {
                    FieldVector fieldVector = fieldVectors.get(i2);
                    ArrowFieldConverter arrowFieldConverter = list.get(i2);
                    for (int i3 = 0; i3 < rowCount; i3++) {
                        try {
                            this.sourceFlinkRows.get(this.flinkRowsCount + i3).setField(i2, arrowFieldConverter.convert(fieldVector, i3));
                        } catch (Exception e) {
                            throw new RuntimeException("Failed to convert arrow data for field " + fieldVector.getField().getName(), e);
                        }
                    }
                }
                this.flinkRowsCount += rowCount;
            }
        }
    }

    private void initFiledConverters(List<ArrowFieldConverter> list) {
        if (list.isEmpty()) {
            Schema schema = this.root.getSchema();
            for (int i = 0; i < schema.getFields().size(); i++) {
                list.add(ArrowFieldConverter.createConverter(this.columnRichInfos.get(this.selectedColumns[i].getColumnIndexInFlinkTable()).getDataType().getLogicalType(), schema.getFields().get(i)));
            }
        }
    }

    public boolean hasNext() {
        if (this.offsetOfBatchForRead < this.flinkRowsCount) {
            return true;
        }
        close();
        return false;
    }

    public GenericRowData next() {
        if (!hasNext()) {
            LOG.error("offset larger than flinksRowsCount");
            throw new RuntimeException("read offset larger than flinksRowsCount");
        }
        List<GenericRowData> list = this.sourceFlinkRows;
        int i = this.offsetOfBatchForRead;
        this.offsetOfBatchForRead = i + 1;
        return list.get(i);
    }

    public int getReadRowCount() {
        return this.flinkRowsCount;
    }

    private void close() {
        try {
            if (this.arrowStreamReader != null) {
                this.arrowStreamReader.close();
            }
            if (this.rootAllocator != null) {
                this.rootAllocator.close();
            }
        } catch (IOException e) {
            LOG.error("Failed to close StarRocksSourceFlinkRows:" + e.getMessage());
            throw new RuntimeException("Failed to close StarRocksSourceFlinkRows:" + e.getMessage());
        }
    }
}
