/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.serialization;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UTFDataFormatException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.util.Random;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.DataInputDeserializer;
import org.apache.flink.util.StringUtils;

public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWritable>
implements RecordDeserializer<T> {
    private static final String BROKEN_SERIALIZATION_ERROR_MESSAGE = "Serializer consumed more bytes than the record had. This indicates broken serialization. If you are using custom serialization types (Value or Writable), check their serialization methods. If you are using a Kryo-serialized type, check the corresponding Kryo serializer.";
    private static final int THRESHOLD_FOR_SPILLING = 0x500000;
    private final NonSpanningWrapper nonSpanningWrapper;
    private final SpanningWrapper spanningWrapper;
    private Buffer currentBuffer;
    private AccumulatorRegistry.Reporter reporter;

    public SpillingAdaptiveSpanningRecordDeserializer() {
        String tempDirString = GlobalConfiguration.getString((String)"taskmanager.tmp.dirs", (String)ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH);
        String[] directories = tempDirString.split(",|" + File.pathSeparator);
        this.nonSpanningWrapper = new NonSpanningWrapper();
        this.spanningWrapper = new SpanningWrapper(directories);
    }

    @Override
    public void setNextBuffer(Buffer buffer) throws IOException {
        this.currentBuffer = buffer;
        MemorySegment segment = buffer.getMemorySegment();
        int numBytes = buffer.getSize();
        this.setNextMemorySegment(segment, numBytes);
    }

    @Override
    public Buffer getCurrentBuffer() {
        Buffer tmp = this.currentBuffer;
        this.currentBuffer = null;
        return tmp;
    }

    @Override
    public void setNextMemorySegment(MemorySegment segment, int numBytes) throws IOException {
        if (this.spanningWrapper.getNumGatheredBytes() > 0) {
            this.spanningWrapper.addNextChunkFromMemorySegment(segment, numBytes);
        } else {
            this.nonSpanningWrapper.initializeFromMemorySegment(segment, 0, numBytes);
        }
    }

    @Override
    public RecordDeserializer.DeserializationResult getNextRecord(T target) throws IOException {
        int nonSpanningRemaining = this.nonSpanningWrapper.remaining();
        if (nonSpanningRemaining >= 4) {
            int len = this.nonSpanningWrapper.readInt();
            if (this.reporter != null) {
                this.reporter.reportNumBytesIn(len);
            }
            if (len <= nonSpanningRemaining - 4) {
                try {
                    int remaining;
                    target.read((DataInputView)this.nonSpanningWrapper);
                    if (this.reporter != null) {
                        this.reporter.reportNumRecordsIn(1L);
                    }
                    if ((remaining = this.nonSpanningWrapper.remaining()) > 0) {
                        return RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
                    }
                    if (remaining == 0) {
                        return RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
                    }
                    throw new IndexOutOfBoundsException("Remaining = " + remaining);
                }
                catch (IndexOutOfBoundsException e) {
                    throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, e);
                }
            }
            this.spanningWrapper.initializeWithPartialRecord(this.nonSpanningWrapper, len);
            this.nonSpanningWrapper.clear();
            return RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
        }
        if (nonSpanningRemaining > 0) {
            this.spanningWrapper.initializeWithPartialLength(this.nonSpanningWrapper);
            this.nonSpanningWrapper.clear();
            return RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
        }
        if (this.spanningWrapper.hasFullRecord()) {
            target.read(this.spanningWrapper.getInputView());
            if (this.reporter != null) {
                this.reporter.reportNumRecordsIn(1L);
            }
            this.spanningWrapper.moveRemainderToNonSpanningDeserializer(this.nonSpanningWrapper);
            this.spanningWrapper.clear();
            return this.nonSpanningWrapper.remaining() == 0 ? RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER : RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
        }
        return RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
    }

    @Override
    public void clear() {
        this.nonSpanningWrapper.clear();
        this.spanningWrapper.clear();
    }

    @Override
    public boolean hasUnfinishedData() {
        return this.nonSpanningWrapper.remaining() > 0 || this.spanningWrapper.getNumGatheredBytes() > 0;
    }

    @Override
    public void setReporter(AccumulatorRegistry.Reporter reporter) {
        this.reporter = reporter;
        this.spanningWrapper.setReporter(reporter);
    }

    private static final class SpanningWrapper {
        private final byte[] initialBuffer = new byte[1024];
        private final String[] tempDirs;
        private final Random rnd = new Random();
        private final DataInputDeserializer serializationReadBuffer;
        private final ByteBuffer lengthBuffer;
        private FileChannel spillingChannel;
        private byte[] buffer;
        private int recordLength;
        private int accumulatedRecordBytes;
        private MemorySegment leftOverData;
        private int leftOverStart;
        private int leftOverLimit;
        private File spillFile;
        private InputViewDataInputStreamWrapper spillFileReader;
        private AccumulatorRegistry.Reporter reporter;

        public SpanningWrapper(String[] tempDirs) {
            this.tempDirs = tempDirs;
            this.lengthBuffer = ByteBuffer.allocate(4);
            this.lengthBuffer.order(ByteOrder.BIG_ENDIAN);
            this.recordLength = -1;
            this.serializationReadBuffer = new DataInputDeserializer();
            this.buffer = this.initialBuffer;
        }

        private void initializeWithPartialRecord(NonSpanningWrapper partial, int nextRecordLength) throws IOException {
            this.recordLength = nextRecordLength;
            int numBytesChunk = partial.remaining();
            if (nextRecordLength > 0x500000) {
                this.spillingChannel = this.createSpillingChannel();
                ByteBuffer toWrite = partial.segment.wrap(partial.position, numBytesChunk);
                this.spillingChannel.write(toWrite);
            } else {
                this.ensureBufferCapacity(numBytesChunk);
                partial.segment.get(partial.position, this.buffer, 0, numBytesChunk);
            }
            this.accumulatedRecordBytes = numBytesChunk;
        }

        private void initializeWithPartialLength(NonSpanningWrapper partial) throws IOException {
            partial.segment.get(partial.position, this.lengthBuffer, partial.remaining());
        }

        private void addNextChunkFromMemorySegment(MemorySegment segment, int numBytesInSegment) throws IOException {
            int segmentPosition = 0;
            if (this.lengthBuffer.position() > 0) {
                int toPut = Math.min(this.lengthBuffer.remaining(), numBytesInSegment);
                segment.get(0, this.lengthBuffer, toPut);
                if (this.lengthBuffer.hasRemaining()) {
                    return;
                }
                this.recordLength = this.lengthBuffer.getInt(0);
                if (this.reporter != null) {
                    this.reporter.reportNumBytesIn(this.recordLength);
                }
                this.lengthBuffer.clear();
                segmentPosition = toPut;
                if (this.recordLength > 0x500000) {
                    this.spillingChannel = this.createSpillingChannel();
                }
            }
            int needed = this.recordLength - this.accumulatedRecordBytes;
            int available = numBytesInSegment - segmentPosition;
            int toCopy = Math.min(needed, available);
            if (this.spillingChannel != null) {
                ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy);
                this.spillingChannel.write(toWrite);
            } else {
                this.ensureBufferCapacity(this.accumulatedRecordBytes + toCopy);
                segment.get(segmentPosition, this.buffer, this.accumulatedRecordBytes, toCopy);
            }
            this.accumulatedRecordBytes += toCopy;
            if (toCopy < available) {
                this.leftOverData = segment;
                this.leftOverStart = segmentPosition + toCopy;
                this.leftOverLimit = numBytesInSegment;
            }
            if (this.accumulatedRecordBytes == this.recordLength) {
                if (this.spillingChannel == null) {
                    this.serializationReadBuffer.setBuffer(this.buffer, 0, this.recordLength);
                } else {
                    this.spillingChannel.close();
                    DataInputStream inStream = new DataInputStream(new BufferedInputStream(new FileInputStream(this.spillFile), 0x200000));
                    this.spillFileReader = new InputViewDataInputStreamWrapper(inStream);
                }
            }
        }

        private void moveRemainderToNonSpanningDeserializer(NonSpanningWrapper deserializer) {
            deserializer.clear();
            if (this.leftOverData != null) {
                deserializer.initializeFromMemorySegment(this.leftOverData, this.leftOverStart, this.leftOverLimit);
            }
        }

        private boolean hasFullRecord() {
            return this.recordLength >= 0 && this.accumulatedRecordBytes >= this.recordLength;
        }

        private int getNumGatheredBytes() {
            return this.accumulatedRecordBytes + (this.recordLength >= 0 ? 4 : this.lengthBuffer.position());
        }

        public void clear() {
            this.buffer = this.initialBuffer;
            this.serializationReadBuffer.releaseArrays();
            this.recordLength = -1;
            this.lengthBuffer.clear();
            this.leftOverData = null;
            this.accumulatedRecordBytes = 0;
            if (this.spillingChannel != null) {
                try {
                    this.spillingChannel.close();
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
                this.spillingChannel = null;
            }
            if (this.spillFileReader != null) {
                try {
                    this.spillFileReader.close();
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
                this.spillFileReader = null;
            }
            if (this.spillFile != null) {
                this.spillFile.delete();
                this.spillFile = null;
            }
        }

        public DataInputView getInputView() {
            if (this.spillFileReader == null) {
                return this.serializationReadBuffer;
            }
            return this.spillFileReader;
        }

        private void ensureBufferCapacity(int minLength) {
            if (this.buffer.length < minLength) {
                byte[] newBuffer = new byte[Math.max(minLength, this.buffer.length * 2)];
                System.arraycopy(this.buffer, 0, newBuffer, 0, this.accumulatedRecordBytes);
                this.buffer = newBuffer;
            }
        }

        private FileChannel createSpillingChannel() throws IOException {
            if (this.spillFile != null) {
                throw new IllegalStateException("Spilling file already exists.");
            }
            String directory = this.tempDirs[this.rnd.nextInt(this.tempDirs.length)];
            this.spillFile = new File(directory, SpanningWrapper.randomString(this.rnd) + ".inputchannel");
            return new RandomAccessFile(this.spillFile, "rw").getChannel();
        }

        private static String randomString(Random random) {
            byte[] bytes = new byte[20];
            random.nextBytes(bytes);
            return StringUtils.byteToHexString((byte[])bytes);
        }

        public void setReporter(AccumulatorRegistry.Reporter reporter) {
            this.reporter = reporter;
        }
    }

    private static final class NonSpanningWrapper
    implements DataInputView {
        private MemorySegment segment;
        private int limit;
        private int position;
        private byte[] utfByteBuffer;
        private char[] utfCharBuffer;

        private NonSpanningWrapper() {
        }

        int remaining() {
            return this.limit - this.position;
        }

        void clear() {
            this.segment = null;
            this.limit = 0;
            this.position = 0;
        }

        void initializeFromMemorySegment(MemorySegment seg, int position, int leftOverLimit) {
            this.segment = seg;
            this.position = position;
            this.limit = leftOverLimit;
        }

        public final void readFully(byte[] b) throws IOException {
            this.readFully(b, 0, b.length);
        }

        public final void readFully(byte[] b, int off, int len) throws IOException {
            if (off < 0 || len < 0 || off + len > b.length) {
                throw new IndexOutOfBoundsException();
            }
            this.segment.get(this.position, b, off, len);
            this.position += len;
        }

        public final boolean readBoolean() throws IOException {
            return this.readByte() == 1;
        }

        public final byte readByte() throws IOException {
            return this.segment.get(this.position++);
        }

        public final int readUnsignedByte() throws IOException {
            return this.readByte() & 0xFF;
        }

        public final short readShort() throws IOException {
            short v = this.segment.getShortBigEndian(this.position);
            this.position += 2;
            return v;
        }

        public final int readUnsignedShort() throws IOException {
            int v = this.segment.getShortBigEndian(this.position) & 0xFFFF;
            this.position += 2;
            return v;
        }

        public final char readChar() throws IOException {
            char v = this.segment.getCharBigEndian(this.position);
            this.position += 2;
            return v;
        }

        public final int readInt() throws IOException {
            int v = this.segment.getIntBigEndian(this.position);
            this.position += 4;
            return v;
        }

        public final long readLong() throws IOException {
            long v = this.segment.getLongBigEndian(this.position);
            this.position += 8;
            return v;
        }

        public final float readFloat() throws IOException {
            return Float.intBitsToFloat(this.readInt());
        }

        public final double readDouble() throws IOException {
            return Double.longBitsToDouble(this.readLong());
        }

        public final String readLine() throws IOException {
            StringBuilder bld = new StringBuilder(32);
            try {
                int b;
                while ((b = this.readUnsignedByte()) != 10) {
                    if (b == 13) continue;
                    bld.append((char)b);
                }
            }
            catch (EOFException b) {
                // empty catch block
            }
            if (bld.length() == 0) {
                return null;
            }
            int len = bld.length();
            if (len > 0 && bld.charAt(len - 1) == '\r') {
                bld.setLength(len - 1);
            }
            return bld.toString();
        }

        public final String readUTF() throws IOException {
            int c;
            int count;
            char[] chararr;
            byte[] bytearr;
            int utflen = this.readUnsignedShort();
            if (this.utfByteBuffer == null || this.utfByteBuffer.length < utflen) {
                bytearr = new byte[utflen];
                this.utfByteBuffer = bytearr;
            } else {
                bytearr = this.utfByteBuffer;
            }
            if (this.utfCharBuffer == null || this.utfCharBuffer.length < utflen) {
                chararr = new char[utflen];
                this.utfCharBuffer = chararr;
            } else {
                chararr = this.utfCharBuffer;
            }
            int chararr_count = 0;
            this.readFully(bytearr, 0, utflen);
            for (count = 0; count < utflen && (c = bytearr[count] & 0xFF) <= 127; ++count) {
                chararr[chararr_count++] = (char)c;
            }
            block6: while (count < utflen) {
                c = bytearr[count] & 0xFF;
                switch (c >> 4) {
                    case 0: 
                    case 1: 
                    case 2: 
                    case 3: 
                    case 4: 
                    case 5: 
                    case 6: 
                    case 7: {
                        ++count;
                        chararr[chararr_count++] = (char)c;
                        continue block6;
                    }
                    case 12: 
                    case 13: {
                        if ((count += 2) > utflen) {
                            throw new UTFDataFormatException("malformed input: partial character at end");
                        }
                        byte char2 = bytearr[count - 1];
                        if ((char2 & 0xC0) != 128) {
                            throw new UTFDataFormatException("malformed input around byte " + count);
                        }
                        chararr[chararr_count++] = (char)((c & 0x1F) << 6 | char2 & 0x3F);
                        continue block6;
                    }
                    case 14: {
                        if ((count += 3) > utflen) {
                            throw new UTFDataFormatException("malformed input: partial character at end");
                        }
                        byte char2 = bytearr[count - 2];
                        byte char3 = bytearr[count - 1];
                        if ((char2 & 0xC0) != 128 || (char3 & 0xC0) != 128) {
                            throw new UTFDataFormatException("malformed input around byte " + (count - 1));
                        }
                        chararr[chararr_count++] = (char)((c & 0xF) << 12 | (char2 & 0x3F) << 6 | (char3 & 0x3F) << 0);
                        continue block6;
                    }
                }
                throw new UTFDataFormatException("malformed input around byte " + count);
            }
            return new String(chararr, 0, chararr_count);
        }

        public final int skipBytes(int n) throws IOException {
            if (n < 0) {
                throw new IllegalArgumentException();
            }
            int toSkip = Math.min(n, this.remaining());
            this.position += toSkip;
            return toSkip;
        }

        public void skipBytesToRead(int numBytes) throws IOException {
            int skippedBytes = this.skipBytes(numBytes);
            if (skippedBytes < numBytes) {
                throw new EOFException("Could not skip " + numBytes + " bytes.");
            }
        }

        public int read(byte[] b, int off, int len) throws IOException {
            if (b == null) {
                throw new NullPointerException("Byte array b cannot be null.");
            }
            if (off < 0) {
                throw new IllegalArgumentException("The offset off cannot be negative.");
            }
            if (len < 0) {
                throw new IllegalArgumentException("The length len cannot be negative.");
            }
            int toRead = Math.min(len, this.remaining());
            this.segment.get(this.position, b, off, toRead);
            this.position += toRead;
            return toRead;
        }

        public int read(byte[] b) throws IOException {
            return this.read(b, 0, b.length);
        }
    }
}

