/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.sorter;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.util.StreamRecordCollector;
import org.apache.flink.util.Collector;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.shuffle.RangeShuffle;
import org.apache.paimon.flink.sorter.SortOperator;
import org.apache.paimon.flink.sorter.TableSortInfo;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyProjectedRow;
import org.apache.paimon.utils.SerializableSupplier;

public class SortUtils {
    public static <KEY> DataStream<RowData> sortStreamByKey(DataStream<RowData> inputStream, FileStoreTable table, RowType sortKeyType, TypeInformation<KEY> keyTypeInformation, SerializableSupplier<Comparator<KEY>> shuffleKeyComparator, final KeyAbstract<KEY> shuffleKeyAbstract, ShuffleKeyConvertor<KEY> convertor, TableSortInfo tableSortInfo) {
        RowType valueRowType = table.rowType();
        CoreOptions options = table.coreOptions();
        int sinkParallelism = tableSortInfo.getSinkParallelism();
        int localSampleSize = tableSortInfo.getLocalSampleSize();
        int globalSampleSize = tableSortInfo.getGlobalSampleSize();
        int rangeNum = tableSortInfo.getRangeNumber();
        int keyFieldCount = sortKeyType.getFieldCount();
        int valueFieldCount = valueRowType.getFieldCount();
        final int[] valueProjectionMap = new int[valueFieldCount];
        for (int i = 0; i < valueFieldCount; ++i) {
            valueProjectionMap[i] = i + keyFieldCount;
        }
        List<DataField> keyFields = sortKeyType.getFields();
        List<DataField> dataFields = valueRowType.getFields();
        ArrayList<DataField> fields = new ArrayList<DataField>();
        fields.addAll(keyFields);
        fields.addAll(dataFields);
        RowType longRowType = new RowType(fields);
        InternalTypeInfo<InternalRow> internalRowType = InternalTypeInfo.fromRowType(longRowType);
        SingleOutputStreamOperator inputWithKey = inputStream.map((MapFunction)new RichMapFunction<RowData, Tuple2<KEY, RowData>>(){

            public void open(OpenContext openContext) throws Exception {
                this.open(new Configuration());
            }

            public void open(Configuration parameters) throws Exception {
                shuffleKeyAbstract.open();
            }

            public Tuple2<KEY, RowData> map(RowData value) {
                return Tuple2.of(shuffleKeyAbstract.apply(value), (Object)value);
            }
        }, (TypeInformation)new TupleTypeInfo(new TypeInformation[]{keyTypeInformation, inputStream.getType()})).setParallelism(inputStream.getParallelism());
        DataStream rangeShuffleResult = RangeShuffle.rangeShuffleByKey(inputWithKey, shuffleKeyComparator, keyTypeInformation, localSampleSize, globalSampleSize, rangeNum, sinkParallelism, valueRowType, options.sortBySize());
        if (tableSortInfo.isSortInCluster()) {
            return rangeShuffleResult.map((MapFunction & Serializable)a -> new JoinedRow((InternalRow)convertor.apply(a.f0), new FlinkRowWrapper((RowData)a.f1)), internalRowType).setParallelism(sinkParallelism).transform("LOCAL SORT", internalRowType, (OneInputStreamOperator)new SortOperator(sortKeyType, longRowType, options.writeBufferSize(), options.pageSize(), options.localSortMaxNumFileHandles(), options.spillCompressOptions(), sinkParallelism, options.writeBufferSpillDiskSize(), options.sequenceFieldSortOrderIsAscending())).setParallelism(sinkParallelism).map((MapFunction)new RichMapFunction<InternalRow, InternalRow>(){
                private transient KeyProjectedRow keyProjectedRow;

                public void open(OpenContext openContext) {
                    this.open(new Configuration());
                }

                public void open(Configuration parameters) {
                    this.keyProjectedRow = new KeyProjectedRow(valueProjectionMap);
                }

                public InternalRow map(InternalRow value) {
                    return this.keyProjectedRow.replaceRow(value);
                }
            }, InternalTypeInfo.fromRowType(valueRowType)).setParallelism(sinkParallelism).map(FlinkRowData::new, inputStream.getType()).setParallelism(sinkParallelism);
        }
        return rangeShuffleResult.transform("REMOVE KEY", inputStream.getType(), new RemoveKeyOperator()).setParallelism(sinkParallelism);
    }

    private static class RemoveKeyOperator<T>
    extends TableStreamOperator<RowData>
    implements OneInputStreamOperator<Tuple2<T, RowData>, RowData> {
        private static final long serialVersionUID = 1L;
        private transient Collector<RowData> collector;

        private RemoveKeyOperator() {
        }

        public void open() throws Exception {
            super.open();
            this.collector = new StreamRecordCollector(this.output);
        }

        public void processElement(StreamRecord<Tuple2<T, RowData>> streamRecord) {
            this.collector.collect(((Tuple2)streamRecord.getValue()).f1);
        }
    }

    static interface ShuffleKeyConvertor<KEY>
    extends Function<KEY, InternalRow>,
    Serializable {
    }

    static interface KeyAbstract<KEY>
    extends Serializable {
        default public void open() {
        }

        public KEY apply(RowData var1);
    }
}

