/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.Random;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.types.Value;
import org.apache.flink.util.MutableObjectIterator;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class ExternalSortLargeRecordsITCase {
    private static final int MEMORY_SIZE = 0x4E00000;
    private final AbstractInvokable parentTask = new DummyInvokable();
    private IOManager ioManager;
    private MemoryManager memoryManager;
    private boolean testSuccess;

    ExternalSortLargeRecordsITCase() {
    }

    @BeforeEach
    void beforeTest() {
        this.memoryManager = MemoryManagerBuilder.newBuilder().setMemorySize(0x4E00000L).build();
        this.ioManager = new IOManagerAsync();
    }

    @AfterEach
    void afterTest() throws Exception {
        this.ioManager.close();
        if (this.memoryManager != null && this.testSuccess) {
            ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.memoryManager.verifyEmpty()).withFailMessage("Memory leak: not all segments have been returned to the memory manager.", new Object[0])).isTrue();
            this.memoryManager.shutdown();
            this.memoryManager = null;
        }
    }

    @Test
    void testSortWithLongRecordsOnly() {
        try {
            int NUM_RECORDS = 10;
            TypeInformation[] types = new TypeInformation[]{BasicTypeInfo.LONG_TYPE_INFO, new ValueTypeInfo(SomeMaybeLongValue.class)};
            TupleTypeInfo typeInfo = new TupleTypeInfo(types);
            TupleSerializer serializer = typeInfo.createSerializer((SerializerConfig)new SerializerConfigImpl());
            TypeComparator comparator = typeInfo.createComparator(new int[]{0}, new boolean[]{false}, 0, new ExecutionConfig());
            MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>(){
                private final Random rnd = new Random(457821643089756298L);
                private int num = 0;

                public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) {
                    return this.next();
                }

                public Tuple2<Long, SomeMaybeLongValue> next() {
                    if (this.num++ < 10) {
                        long val = this.rnd.nextLong();
                        return new Tuple2((Object)val, (Object)new SomeMaybeLongValue((int)val));
                    }
                    return null;
                }
            };
            ExternalSorter sorter = ExternalSorter.newBuilder((MemoryManager)this.memoryManager, (AbstractInvokable)this.parentTask, (TypeSerializer)serializer, (TypeComparator)comparator).maxNumFileHandles(128).sortBuffers(1).enableSpilling(this.ioManager, (double)0.7f).memoryFraction(1.0).objectReuse(false).largeRecords(true).build((MutableObjectIterator)source);
            MutableObjectIterator iterator = sorter.getIterator();
            Tuple2 val = (Tuple2)serializer.createInstance();
            long prevKey = Long.MAX_VALUE;
            for (int i = 0; i < 10; ++i) {
                val = (Tuple2)iterator.next((Object)val);
                Assertions.assertThat((Long)((Long)val.f0)).isLessThanOrEqualTo(prevKey);
                Assertions.assertThat((int)((Long)val.f0).intValue()).isEqualTo(((SomeMaybeLongValue)val.f1).val());
            }
            Assertions.assertThat((Object)((Tuple2)iterator.next((Object)val))).isNull();
            sorter.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testSortWithLongAndShortRecordsMixed() {
        try {
            int NUM_RECORDS = 1000000;
            int LARGE_REC_INTERVAL = 100000;
            TypeInformation[] types = new TypeInformation[]{BasicTypeInfo.LONG_TYPE_INFO, new ValueTypeInfo(SomeMaybeLongValue.class)};
            TupleTypeInfo typeInfo = new TupleTypeInfo(types);
            TupleSerializer serializer = typeInfo.createSerializer((SerializerConfig)new SerializerConfigImpl());
            TypeComparator comparator = typeInfo.createComparator(new int[]{0}, new boolean[]{false}, 0, new ExecutionConfig());
            MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>> source = new MutableObjectIterator<Tuple2<Long, SomeMaybeLongValue>>(){
                private final Random rnd = new Random(145610843608763871L);
                private int num = -1;

                public Tuple2<Long, SomeMaybeLongValue> next(Tuple2<Long, SomeMaybeLongValue> reuse) {
                    return this.next();
                }

                public Tuple2<Long, SomeMaybeLongValue> next() {
                    if (++this.num < 1000000) {
                        long val = this.rnd.nextLong();
                        return new Tuple2((Object)val, (Object)new SomeMaybeLongValue((int)val, this.num % 100000 == 0));
                    }
                    return null;
                }
            };
            ExternalSorter sorter = ExternalSorter.newBuilder((MemoryManager)this.memoryManager, (AbstractInvokable)this.parentTask, (TypeSerializer)serializer, (TypeComparator)comparator).maxNumFileHandles(128).sortBuffers(1).enableSpilling(this.ioManager, (double)0.7f).memoryFraction(1.0).objectReuse(true).largeRecords(true).build((MutableObjectIterator)source);
            MutableObjectIterator iterator = sorter.getIterator();
            Tuple2 val = (Tuple2)serializer.createInstance();
            long prevKey = Long.MAX_VALUE;
            for (int i = 0; i < 1000000; ++i) {
                val = (Tuple2)iterator.next((Object)val);
                ((AbstractLongAssert)Assertions.assertThat((Long)((Long)val.f0)).withFailMessage("Sort order violated", new Object[0])).isLessThanOrEqualTo(prevKey);
                ((AbstractIntegerAssert)Assertions.assertThat((int)((Long)val.f0).intValue()).withFailMessage("Serialization of test data type incorrect", new Object[0])).isEqualTo(((SomeMaybeLongValue)val.f1).val());
            }
            Assertions.assertThat((Object)((Tuple2)iterator.next((Object)val))).isNull();
            sorter.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testSortWithShortMediumAndLargeRecords() {
        try {
            int NUM_RECORDS = 50000;
            int LARGE_REC_INTERVAL = 10000;
            int MEDIUM_REC_INTERVAL = 500;
            TypeInformation[] types = new TypeInformation[]{BasicTypeInfo.LONG_TYPE_INFO, new ValueTypeInfo(SmallOrMediumOrLargeValue.class)};
            TupleTypeInfo typeInfo = new TupleTypeInfo(types);
            TupleSerializer serializer = typeInfo.createSerializer((SerializerConfig)new SerializerConfigImpl());
            TypeComparator comparator = typeInfo.createComparator(new int[]{0}, new boolean[]{false}, 0, new ExecutionConfig());
            MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> source = new MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>>(){
                private final Random rnd = new Random(1456108743687167086L);
                private int num = -1;

                public Tuple2<Long, SmallOrMediumOrLargeValue> next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) {
                    return this.next();
                }

                public Tuple2<Long, SmallOrMediumOrLargeValue> next() {
                    if (++this.num < 50000) {
                        int size = this.num % 10000 == 0 ? 0x6400000 : (this.num % 500 == 0 ? 0xC00000 : 0);
                        long val = this.rnd.nextLong();
                        return new Tuple2((Object)val, (Object)new SmallOrMediumOrLargeValue((int)val, size));
                    }
                    return null;
                }
            };
            ExternalSorter sorter = ExternalSorter.newBuilder((MemoryManager)this.memoryManager, (AbstractInvokable)this.parentTask, (TypeSerializer)serializer, (TypeComparator)comparator).maxNumFileHandles(128).sortBuffers(1).enableSpilling(this.ioManager, (double)0.7f).memoryFraction(1.0).objectReuse(false).largeRecords(true).build((MutableObjectIterator)source);
            MutableObjectIterator iterator = sorter.getIterator();
            Tuple2 val = (Tuple2)serializer.createInstance();
            for (int i = 0; i < 50000; ++i) {
                val = (Tuple2)iterator.next((Object)val);
                Assertions.assertThat((int)((Long)val.f0).intValue()).isEqualTo(((SmallOrMediumOrLargeValue)val.f1).val());
            }
            Assertions.assertThat((Object)((Tuple2)iterator.next((Object)val))).isNull();
            sorter.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    @Test
    void testSortWithMediumRecordsOnly() {
        try {
            int NUM_RECORDS = 70;
            TypeInformation[] types = new TypeInformation[]{BasicTypeInfo.LONG_TYPE_INFO, new ValueTypeInfo(SmallOrMediumOrLargeValue.class)};
            TupleTypeInfo typeInfo = new TupleTypeInfo(types);
            TupleSerializer serializer = typeInfo.createSerializer((SerializerConfig)new SerializerConfigImpl());
            TypeComparator comparator = typeInfo.createComparator(new int[]{0}, new boolean[]{false}, 0, new ExecutionConfig());
            MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>> source = new MutableObjectIterator<Tuple2<Long, SmallOrMediumOrLargeValue>>(){
                private final Random rnd = new Random(62360187263087678L);
                private int num = -1;

                public Tuple2<Long, SmallOrMediumOrLargeValue> next(Tuple2<Long, SmallOrMediumOrLargeValue> reuse) {
                    return this.next();
                }

                public Tuple2<Long, SmallOrMediumOrLargeValue> next() {
                    if (++this.num < 70) {
                        long val = this.rnd.nextLong();
                        return new Tuple2((Object)val, (Object)new SmallOrMediumOrLargeValue((int)val, 0xC00000));
                    }
                    return null;
                }
            };
            ExternalSorter sorter = ExternalSorter.newBuilder((MemoryManager)this.memoryManager, (AbstractInvokable)this.parentTask, (TypeSerializer)serializer, (TypeComparator)comparator).maxNumFileHandles(128).sortBuffers(1).enableSpilling(this.ioManager, (double)0.7f).memoryFraction(1.0).objectReuse(true).largeRecords(true).build((MutableObjectIterator)source);
            MutableObjectIterator iterator = sorter.getIterator();
            Tuple2 val = (Tuple2)serializer.createInstance();
            long prevKey = Long.MAX_VALUE;
            for (int i = 0; i < 70; ++i) {
                val = (Tuple2)iterator.next((Object)val);
                Assertions.assertThat((Long)((Long)val.f0)).isLessThanOrEqualTo(prevKey);
                Assertions.assertThat((int)((Long)val.f0).intValue()).isEqualTo(((SmallOrMediumOrLargeValue)val.f1).val());
            }
            Assertions.assertThat((Object)((Tuple2)iterator.next((Object)val))).isNull();
            sorter.close();
            this.testSuccess = true;
        }
        catch (Exception e) {
            e.printStackTrace();
            Assertions.fail((String)e.getMessage());
        }
    }

    public static final class SmallOrMediumOrLargeValue
    implements Value {
        private static final long serialVersionUID = 1L;
        public static final int SMALL_SIZE = 0;
        public static final int MEDIUM_SIZE = 0xC00000;
        public static final int LARGE_SIZE = 0x6400000;
        private int val;
        private int size;

        public SmallOrMediumOrLargeValue() {
            this.size = 0;
        }

        public SmallOrMediumOrLargeValue(int val) {
            this.val = val;
            this.size = 0;
        }

        public SmallOrMediumOrLargeValue(int val, int size) {
            this.val = val;
            this.size = size;
        }

        public int val() {
            return this.val;
        }

        public int getSize() {
            return this.size;
        }

        public void read(DataInputView in) throws IOException {
            this.val = in.readInt();
            this.size = in.readInt();
            for (int i = 0; i < this.size; ++i) {
                byte b = in.readByte();
                Assertions.assertThat((byte)b).isEqualTo((byte)i);
            }
        }

        public void write(DataOutputView out) throws IOException {
            out.writeInt(this.val);
            out.writeInt(this.size);
            for (int i = 0; i < this.size; ++i) {
                out.write((int)((byte)i));
            }
        }

        public int hashCode() {
            return this.val;
        }

        public boolean equals(Object obj) {
            if (obj instanceof SmallOrMediumOrLargeValue) {
                SmallOrMediumOrLargeValue other = (SmallOrMediumOrLargeValue)obj;
                return other.val == this.val && other.size == this.size;
            }
            return false;
        }

        public String toString() {
            return String.format("Value %d (%d bytes)", this.val, this.size);
        }
    }

    public static final class SomeMaybeLongValue
    implements Value {
        private static final long serialVersionUID = 1L;
        private static final byte[] BUFFER = new byte[0x6400000];
        private int val;
        private boolean isLong;

        public SomeMaybeLongValue() {
            this.isLong = true;
        }

        public SomeMaybeLongValue(int val) {
            this.val = val;
            this.isLong = true;
        }

        public SomeMaybeLongValue(int val, boolean isLong) {
            this.val = val;
            this.isLong = isLong;
        }

        public int val() {
            return this.val;
        }

        public boolean isLong() {
            return this.isLong;
        }

        public void read(DataInputView in) throws IOException {
            this.val = in.readInt();
            this.isLong = in.readBoolean();
            if (this.isLong) {
                for (int i = 0; i < BUFFER.length; ++i) {
                    byte b = in.readByte();
                    Assertions.assertThat((byte)b).isEqualTo(BUFFER[i]);
                }
            }
        }

        public void write(DataOutputView out) throws IOException {
            out.writeInt(this.val);
            out.writeBoolean(this.isLong);
            if (this.isLong) {
                out.write(BUFFER);
            }
        }

        public int hashCode() {
            return this.val;
        }

        public boolean equals(Object obj) {
            return obj instanceof SomeMaybeLongValue && ((SomeMaybeLongValue)obj).val == this.val;
        }

        public String toString() {
            return this.isLong ? "Large Value" : "Small Value";
        }

        static {
            for (int i = 0; i < BUFFER.length; ++i) {
                SomeMaybeLongValue.BUFFER[i] = (byte)i;
            }
        }
    }
}

