/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sorter;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.sorter.SortUtils;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.PrimaryKeyTableUtils;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyComparatorSupplier;

public class OrderSorter
extends TableSorter {
    private final TableSortInfo tableSortInfo;

    public OrderSorter(StreamExecutionEnvironment batchTEnv, DataStream<RowData> origin, FileStoreTable table, TableSortInfo tableSortInfo) {
        super(batchTEnv, origin, table, tableSortInfo.getSortColumns());
        this.tableSortInfo = tableSortInfo;
    }

    @Override
    public DataStream<RowData> sort() {
        final RowType valueRowType = this.table.rowType();
        final int[] keyProjectionMap = this.table.schema().projection(this.orderColNames);
        RowType keyRowType = PrimaryKeyTableUtils.addKeyNamePrefix(org.apache.paimon.utils.Projection.of(keyProjectionMap).project(valueRowType));
        return SortUtils.sortStreamByKey((DataStream<RowData>)this.origin, this.table, keyRowType, InternalTypeInfo.fromRowType(keyRowType), new KeyComparatorSupplier(keyRowType), new SortUtils.KeyAbstract<InternalRow>(){
            private transient Projection keyProjection;

            @Override
            public void open() {
                this.keyProjection = CodeGenUtils.newProjection(valueRowType, keyProjectionMap);
            }

            @Override
            public InternalRow apply(RowData value) {
                return this.keyProjection.apply(new FlinkRowWrapper(value)).copy();
            }
        }, row -> row, this.tableSortInfo);
    }
}

