/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.core.memory.MemorySegmentSource;
import org.apache.flink.runtime.io.disk.SimpleCollectingOutputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.memory.AbstractPagedOutputView;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.data.writer.BinaryRowWriter;
import org.apache.flink.table.runtime.generated.NormalizedKeyComputer;
import org.apache.flink.table.runtime.generated.RecordComparator;
import org.apache.flink.table.runtime.operators.sort.BufferedKVExternalSorter;
import org.apache.flink.table.runtime.operators.sort.IntNormalizedKeyComputer;
import org.apache.flink.table.runtime.operators.sort.IntRecordComparator;
import org.apache.flink.table.runtime.operators.sort.TestMemorySegmentPool;
import org.apache.flink.table.runtime.typeutils.BinaryRowDataSerializer;
import org.apache.flink.table.runtime.util.MemorySegmentPool;
import org.apache.flink.util.MutableObjectIterator;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class BufferedKVExternalSorterTest {
    private static final int PAGE_SIZE = 32768;
    private IOManager ioManager = new IOManagerAsync();
    private BinaryRowDataSerializer keySerializer;
    private BinaryRowDataSerializer valueSerializer;
    private NormalizedKeyComputer computer;
    private RecordComparator comparator;
    private int spillNumber;
    private int recordNumberPerFile;
    private Configuration conf = new Configuration();

    public BufferedKVExternalSorterTest(int spillNumber, int recordNumberPerFile, boolean spillCompress) {
        this.conf.setInteger(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES, 5);
        if (!spillCompress) {
            this.conf.setBoolean(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED, false);
        }
        this.spillNumber = spillNumber;
        this.recordNumberPerFile = recordNumberPerFile;
    }

    @Parameterized.Parameters
    public static List<Object[]> getDataSize() {
        ArrayList<Object[]> paras = new ArrayList<Object[]>();
        paras.add(new Object[]{3, 1000, true});
        paras.add(new Object[]{3, 1000, false});
        paras.add(new Object[]{10, 1000, true});
        paras.add(new Object[]{10, 1000, false});
        paras.add(new Object[]{10, 10000, true});
        paras.add(new Object[]{10, 10000, false});
        return paras;
    }

    @Before
    public void beforeTest() throws InstantiationException, IllegalAccessException {
        this.ioManager = new IOManagerAsync();
        this.keySerializer = new BinaryRowDataSerializer(2);
        this.valueSerializer = new BinaryRowDataSerializer(2);
        this.computer = IntNormalizedKeyComputer.INSTANCE;
        this.comparator = IntRecordComparator.INSTANCE;
    }

    @After
    public void afterTest() throws Exception {
        this.ioManager.close();
    }

    @Test
    public void test() throws Exception {
        BufferedKVExternalSorter sorter = new BufferedKVExternalSorter(this.ioManager, this.keySerializer, this.valueSerializer, this.computer, this.comparator, 32768, ((Integer)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SORT_MAX_NUM_FILE_HANDLES)).intValue(), ((Boolean)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_ENABLED)).booleanValue(), (int)((MemorySize)this.conf.get(ExecutionConfigOptions.TABLE_EXEC_SPILL_COMPRESSION_BLOCK_SIZE)).getBytes());
        TestMemorySegmentPool pool = new TestMemorySegmentPool(32768);
        ArrayList<Integer> expected = new ArrayList<Integer>();
        for (int i = 0; i < this.spillNumber; ++i) {
            ArrayList segments = new ArrayList();
            SimpleCollectingOutputView out = new SimpleCollectingOutputView(segments, (MemorySegmentSource)pool, 32768);
            this.writeKVToBuffer(this.keySerializer, this.valueSerializer, out, expected, this.recordNumberPerFile);
            sorter.sortAndSpill(segments, (long)this.recordNumberPerFile, (MemorySegmentPool)pool);
        }
        Collections.sort(expected);
        MutableObjectIterator iterator = sorter.getKVIterator();
        Tuple2 kv = new Tuple2((Object)this.keySerializer.createInstance(), (Object)this.valueSerializer.createInstance());
        int count = 0;
        while ((kv = (Tuple2)iterator.next((Object)kv)) != null) {
            Assertions.assertThat((int)((BinaryRowData)kv.f0).getInt(0)).isEqualTo(((Integer)expected.get(count)).intValue());
            Assertions.assertThat((int)((BinaryRowData)kv.f1).getInt(0)).isEqualTo((Integer)expected.get(count) * -3 + 177);
            ++count;
        }
        Assertions.assertThat((int)count).isEqualTo(expected.size());
        sorter.close();
    }

    private void writeKVToBuffer(BinaryRowDataSerializer keySerializer, BinaryRowDataSerializer valueSerializer, SimpleCollectingOutputView out, List<Integer> expecteds, int length) throws IOException {
        Random random = new Random();
        int stringLength = 30;
        for (int i = 0; i < length; ++i) {
            BinaryRowData key = BufferedKVExternalSorterTest.randomRow(random, stringLength);
            BinaryRowData val = key.copy();
            val.setInt(0, val.getInt(0) * -3 + 177);
            expecteds.add(key.getInt(0));
            keySerializer.serializeToPages(key, (AbstractPagedOutputView)out);
            valueSerializer.serializeToPages(val, (AbstractPagedOutputView)out);
        }
    }

    public static BinaryRowData randomRow(Random random, int stringLength) {
        BinaryRowData row = new BinaryRowData(2);
        BinaryRowWriter writer = new BinaryRowWriter(row);
        writer.writeInt(0, random.nextInt());
        writer.writeString(1, StringData.fromString((String)RandomStringUtils.random((int)stringLength)));
        writer.complete();
        return row;
    }
}

