/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.AlgorithmOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
import org.apache.flink.runtime.memory.MemoryAllocationException;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.sort.CircularElement;
import org.apache.flink.runtime.operators.sort.CircularQueues;
import org.apache.flink.runtime.operators.sort.CombiningSpillingBehaviour;
import org.apache.flink.runtime.operators.sort.DefaultInMemorySorterFactory;
import org.apache.flink.runtime.operators.sort.DefaultSpillingBehaviour;
import org.apache.flink.runtime.operators.sort.ExceptionHandler;
import org.apache.flink.runtime.operators.sort.ExternalSorter;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.InMemorySorterFactory;
import org.apache.flink.runtime.operators.sort.LargeRecordHandler;
import org.apache.flink.runtime.operators.sort.PushSorter;
import org.apache.flink.runtime.operators.sort.ReadingThread;
import org.apache.flink.runtime.operators.sort.SorterInputGateway;
import org.apache.flink.runtime.operators.sort.SortingThread;
import org.apache.flink.runtime.operators.sort.SpillChannelManager;
import org.apache.flink.runtime.operators.sort.SpillingThread;
import org.apache.flink.runtime.operators.sort.StageRunner;
import org.apache.flink.util.MutableObjectIterator;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ExternalSorterBuilder<T> {
    private static final Logger LOG = LoggerFactory.getLogger(ExternalSorter.class);
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    private static final int MIN_NUM_WRITE_BUFFERS = 2;
    private static final int MAX_NUM_WRITE_BUFFERS = 4;
    private static final int MIN_NUM_SORT_MEM_SEGMENTS = 10;
    private final MemoryManager memoryManager;
    private final TaskInvokable parentTask;
    private final TypeSerializer<T> serializer;
    private final TypeComparator<T> comparator;
    private InMemorySorterFactory<T> inMemorySorterFactory;
    private int maxNumFileHandles = AlgorithmOptions.SPILLING_MAX_FAN.defaultValue();
    private boolean objectReuseEnabled = false;
    private boolean handleLargeRecords = false;
    private double memoryFraction = 1.0;
    private int numSortBuffers = -1;
    private double startSpillingFraction = AlgorithmOptions.SORT_SPILLING_THRESHOLD.defaultValue().floatValue();
    private IOManager ioManager;
    private boolean noSpillingMemory = true;
    private GroupCombineFunction<T, T> combineFunction;
    private Configuration udfConfig;
    private List<MemorySegment> memorySegments = null;
    private final ExecutionConfig executionConfig;

    ExternalSorterBuilder(MemoryManager memoryManager, TaskInvokable parentTask, TypeSerializer<T> serializer, TypeComparator<T> comparator, ExecutionConfig executionConfig) {
        this.memoryManager = memoryManager;
        this.parentTask = parentTask;
        this.executionConfig = executionConfig;
        this.serializer = serializer;
        this.comparator = comparator;
        this.inMemorySorterFactory = new DefaultInMemorySorterFactory<T>(serializer, comparator, 32);
    }

    public ExternalSorterBuilder<T> maxNumFileHandles(int maxNumFileHandles) {
        if (maxNumFileHandles < 2) {
            throw new IllegalArgumentException("Merger cannot work with less than two file handles.");
        }
        this.maxNumFileHandles = maxNumFileHandles;
        return this;
    }

    public ExternalSorterBuilder<T> objectReuse(boolean enabled) {
        this.objectReuseEnabled = enabled;
        return this;
    }

    public ExternalSorterBuilder<T> largeRecords(boolean enabled) {
        this.handleLargeRecords = enabled;
        return this;
    }

    public ExternalSorterBuilder<T> enableSpilling(IOManager ioManager) {
        this.noSpillingMemory = false;
        this.ioManager = Preconditions.checkNotNull(ioManager);
        return this;
    }

    public ExternalSorterBuilder<T> enableSpilling(IOManager ioManager, double startSpillingFraction) {
        this.startSpillingFraction = startSpillingFraction;
        return this.enableSpilling(ioManager);
    }

    public ExternalSorterBuilder<T> memoryFraction(double fraction) {
        this.memoryFraction = fraction;
        return this;
    }

    public ExternalSorterBuilder<T> memory(List<MemorySegment> memorySegments) {
        this.memorySegments = Preconditions.checkNotNull(memorySegments);
        return this;
    }

    public ExternalSorterBuilder<T> sortBuffers(int numSortBuffers) {
        this.numSortBuffers = numSortBuffers;
        return this;
    }

    public ExternalSorterBuilder<T> withCombiner(GroupCombineFunction<T, T> combineFunction, Configuration udfConfig) {
        this.combineFunction = Preconditions.checkNotNull(combineFunction);
        this.udfConfig = Preconditions.checkNotNull(udfConfig);
        return this;
    }

    public ExternalSorterBuilder<T> withCombiner(GroupCombineFunction<T, T> combineFunction) {
        this.combineFunction = Preconditions.checkNotNull(combineFunction);
        this.udfConfig = new Configuration();
        return this;
    }

    public ExternalSorterBuilder<T> sorterFactory(InMemorySorterFactory<T> sorterFactory) {
        this.inMemorySorterFactory = Preconditions.checkNotNull(sorterFactory);
        return this;
    }

    public ExternalSorter<T> build(MutableObjectIterator<T> input) throws MemoryAllocationException {
        return this.doBuild((exceptionHandler, dispatcher, largeRecordHandler, startSpillingBytes) -> new ReadingThread<T>(exceptionHandler, input, dispatcher, largeRecordHandler, this.serializer.createInstance(), startSpillingBytes));
    }

    public PushSorter<T> build() throws MemoryAllocationException {
        final PushFactory pushFactory = new PushFactory();
        final ExternalSorter tExternalSorter = this.doBuild(pushFactory);
        return new PushSorter<T>(){
            private final SorterInputGateway<T> recordProducer;
            {
                this.recordProducer = pushFactory.sorterInputGateway;
            }

            @Override
            public void writeRecord(T record) throws IOException, InterruptedException {
                this.recordProducer.writeRecord(record);
            }

            @Override
            public void finishReading() {
                this.recordProducer.finishReading();
            }

            @Override
            public MutableObjectIterator<T> getIterator() throws InterruptedException {
                return tExternalSorter.getIterator();
            }

            @Override
            public void close() {
                tExternalSorter.close();
            }
        };
    }

    private ExternalSorter<T> doBuild(ReadingStageFactory<T> readingStageFactory) throws MemoryAllocationException {
        LargeRecordHandler<T> largeRecordHandler;
        int numLargeRecordBuffers;
        int numWriteBuffers;
        List<MemorySegment> memory = this.memorySegments != null ? this.memorySegments : this.memoryManager.allocatePages(this.parentTask, this.memoryManager.computeNumberOfPages(this.memoryFraction));
        int numPagesTotal = memory.size();
        if (numPagesTotal < 12) {
            throw new IllegalArgumentException("Too little memory provided to sorter to perform task. Required are at least 12 pages. Current page size is " + this.memoryManager.getPageSize() + " bytes.");
        }
        if (this.noSpillingMemory && !this.handleLargeRecords) {
            numWriteBuffers = 0;
            numLargeRecordBuffers = 0;
        } else {
            int numConsumers = (this.noSpillingMemory ? 0 : 1) + (this.handleLargeRecords ? 2 : 0);
            int minBuffersForMerging = this.maxNumFileHandles + numConsumers * 2;
            if (minBuffersForMerging > numPagesTotal) {
                numWriteBuffers = this.noSpillingMemory ? 0 : 2;
                numLargeRecordBuffers = this.handleLargeRecords ? 4 : 0;
                this.maxNumFileHandles = numPagesTotal - numConsumers * 2;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reducing maximal merge fan-in to " + this.maxNumFileHandles + " due to limited memory availability during merge");
                }
            } else {
                int fractionalAuxBuffers = numPagesTotal / (numConsumers * 100);
                if (fractionalAuxBuffers >= 4) {
                    numWriteBuffers = this.noSpillingMemory ? 0 : 4;
                    numLargeRecordBuffers = this.handleLargeRecords ? 8 : 0;
                } else {
                    numWriteBuffers = this.noSpillingMemory ? 0 : Math.max(2, fractionalAuxBuffers);
                    numLargeRecordBuffers = this.handleLargeRecords ? 4 : 0;
                }
            }
        }
        int sortMemPages = numPagesTotal - numWriteBuffers - numLargeRecordBuffers;
        long sortMemory = (long)sortMemPages * (long)this.memoryManager.getPageSize();
        if (this.numSortBuffers < 1) {
            this.numSortBuffers = sortMemory > 0x6400000L ? 2 : 1;
        }
        int numSegmentsPerSortBuffer = sortMemPages / this.numSortBuffers;
        LOG.debug(String.format("Instantiating sorter with %d pages of sorting memory (=%d bytes total) divided over %d sort buffers (%d pages per buffer). Using %d buffers for writing sorted results and merging maximally %d streams at once. Using %d memory segments for large record spilling.", sortMemPages, sortMemory, this.numSortBuffers, numSegmentsPerSortBuffer, numWriteBuffers, this.maxNumFileHandles, numLargeRecordBuffers));
        ArrayList<MemorySegment> writeMemory = new ArrayList<MemorySegment>(numWriteBuffers);
        if (numWriteBuffers > 0) {
            for (int i = 0; i < numWriteBuffers; ++i) {
                writeMemory.add(memory.remove(memory.size() - 1));
            }
        }
        if (numLargeRecordBuffers > 0) {
            ArrayList<MemorySegment> mem = new ArrayList<MemorySegment>();
            for (int i = 0; i < numLargeRecordBuffers; ++i) {
                mem.add(memory.remove(memory.size() - 1));
            }
            largeRecordHandler = new LargeRecordHandler<T>(this.serializer, this.comparator.duplicate(), this.ioManager, this.memoryManager, mem, this.parentTask, this.maxNumFileHandles, this.executionConfig);
        } else {
            largeRecordHandler = null;
        }
        CircularQueues circularQueues = new CircularQueues();
        ArrayList<InMemorySorter<InMemorySorter<T>>> inMemorySorters = new ArrayList<InMemorySorter<InMemorySorter<T>>>(this.numSortBuffers);
        Iterator<MemorySegment> segments = memory.iterator();
        for (int i = 0; i < this.numSortBuffers; ++i) {
            int k;
            ArrayList<MemorySegment> sortSegments = new ArrayList<MemorySegment>(numSegmentsPerSortBuffer);
            int n = k = i == this.numSortBuffers - 1 ? Integer.MAX_VALUE : numSegmentsPerSortBuffer;
            while (k > 0 && segments.hasNext()) {
                sortSegments.add(segments.next());
                --k;
            }
            InMemorySorter<T> inMemorySorter = this.inMemorySorterFactory.create(sortSegments);
            inMemorySorters.add(inMemorySorter);
            CircularElement<T> element = new CircularElement<T>(i, inMemorySorter, sortSegments);
            circularQueues.send(StageRunner.SortStage.READ, element);
        }
        ExceptionHandler<IOException> exceptionHandler = exception -> circularQueues.getIteratorFuture().completeExceptionally(exception);
        SpillChannelManager spillChannelManager = new SpillChannelManager();
        StageRunner readingThread = readingStageFactory.createReadingStage(exceptionHandler, circularQueues, largeRecordHandler, (long)(this.startSpillingFraction * (double)sortMemory));
        SortingThread sortingStage = new SortingThread(exceptionHandler, circularQueues);
        SpillingThread.SpillingBehaviour<T> spillingBehaviour = this.combineFunction != null ? new CombiningSpillingBehaviour<T>(this.combineFunction, this.serializer, this.comparator, this.objectReuseEnabled, this.udfConfig) : new DefaultSpillingBehaviour<T>(this.objectReuseEnabled, this.serializer);
        SpillingThread spillingStage = new SpillingThread(exceptionHandler, circularQueues, this.memoryManager, this.ioManager, this.serializer, this.comparator, memory, writeMemory, this.maxNumFileHandles, spillChannelManager, largeRecordHandler, spillingBehaviour, 2, 4);
        return new ExternalSorter<T>(readingThread, sortingStage, spillingStage, memory, writeMemory, this.memoryManager, largeRecordHandler, spillChannelManager, inMemorySorters, circularQueues);
    }

    private static final class PushFactory<E>
    implements ReadingStageFactory<E> {
        private SorterInputGateway<E> sorterInputGateway;

        private PushFactory() {
        }

        @Override
        public StageRunner createReadingStage(ExceptionHandler<IOException> exceptionHandler, StageRunner.StageMessageDispatcher<E> dispatcher, LargeRecordHandler<E> largeRecordHandler, long startSpillingBytes) {
            this.sorterInputGateway = new SorterInputGateway<E>(dispatcher, largeRecordHandler, startSpillingBytes);
            return null;
        }
    }

    @FunctionalInterface
    private static interface ReadingStageFactory<E> {
        @Nullable
        public StageRunner createReadingStage(ExceptionHandler<IOException> var1, StageRunner.StageMessageDispatcher<E> var2, LargeRecordHandler<E> var3, long var4);
    }
}

