/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk.iomanager;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.spi.AbstractInterruptibleChannel;
import java.util.Collection;
import java.util.List;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.types.IntValue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IOManagerPerformanceBenchmark {
    private static final Logger LOG = LoggerFactory.getLogger(IOManagerPerformanceBenchmark.class);
    private static final int[] SEGMENT_SIZES_ALIGNED = new int[]{4096, 16384, 524288};
    private static final int[] SEGMENT_SIZES_UNALIGNED = new int[]{3862, 16895, 500481};
    private static final int[] NUM_SEGMENTS = new int[]{1, 2, 4, 6};
    private static final long MEMORY_SIZE = 0x2000000L;
    private static final int NUM_INTS_WRITTEN = 100000000;
    private static final AbstractInvokable memoryOwner = new DummyInvokable();
    private MemoryManager memManager;
    private IOManager ioManager;

    @Before
    public void startup() {
        this.memManager = new MemoryManager(0x2000000L, 1);
        this.ioManager = new IOManagerAsync();
    }

    @After
    public void afterTest() throws Exception {
        this.ioManager.shutdown();
        Assert.assertTrue((String)"IO Manager has not properly shut down.", (boolean)this.ioManager.isProperlyShutDown());
        Assert.assertTrue((String)"Not all memory was returned to the memory manager in the test.", (boolean)this.memManager.verifyEmpty());
        this.memManager.shutdown();
        this.memManager = null;
    }

    @Test
    public void speedTestIOManager() throws Exception {
        LOG.info("Starting speed test with IO Manager...");
        for (int num : NUM_SEGMENTS) {
            this.testChannelWithSegments(num);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testChannelWithSegments(int numSegments) throws Exception {
        List memory = this.memManager.allocatePages((Object)memoryOwner, numSegments);
        FileIOChannel.ID channel = this.ioManager.createChannel();
        BlockChannelWriter writer = null;
        BlockChannelReader reader = null;
        try {
            writer = this.ioManager.createBlockChannelWriter(channel);
            ChannelWriterOutputView out = new ChannelWriterOutputView(writer, memory, this.memManager.getPageSize());
            long writeStart = System.currentTimeMillis();
            int valsLeft = 100000000;
            while (valsLeft-- > 0) {
                out.writeInt(valsLeft);
            }
            out.close();
            int numBlocks = out.getBlockCount();
            writer.close();
            writer = null;
            long writeElapsed = System.currentTimeMillis() - writeStart;
            reader = this.ioManager.createBlockChannelReader(channel);
            ChannelReaderInputView in = new ChannelReaderInputView(reader, memory, numBlocks, false);
            long readStart = System.currentTimeMillis();
            valsLeft = 100000000;
            while (valsLeft-- > 0) {
                in.readInt();
            }
            in.close();
            reader.close();
            long readElapsed = System.currentTimeMillis() - readStart;
            reader.deleteChannel();
            reader = null;
            LOG.info("IOManager with " + numSegments + " mem segments: write " + writeElapsed + " msecs, read " + readElapsed + " msecs.");
            this.memManager.release((Collection)memory);
        }
        finally {
            if (reader != null) {
                reader.closeAndDelete();
            }
            if (writer != null) {
                writer.closeAndDelete();
            }
        }
    }

    @Test
    public void speedTestFileStream() throws Exception {
        LOG.info("Starting speed test with java io file stream and ALIGNED buffer sizes ...");
        for (int bufferSize : SEGMENT_SIZES_ALIGNED) {
            this.speedTestStream(bufferSize);
        }
        LOG.info("Starting speed test with java io file stream and UNALIGNED buffer sizes ...");
        for (int bufferSize : SEGMENT_SIZES_UNALIGNED) {
            this.speedTestStream(bufferSize);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void speedTestStream(int bufferSize) throws IOException {
        FileIOChannel.ID tmpChannel = this.ioManager.createChannel();
        IntValue rec = new IntValue(0);
        File tempFile = null;
        FilterOutputStream daos = null;
        FilterInputStream dais = null;
        try {
            tempFile = new File(tmpChannel.getPath());
            FileOutputStream fos = new FileOutputStream(tempFile);
            daos = new DataOutputStream(new BufferedOutputStream(fos, bufferSize));
            long writeStart = System.currentTimeMillis();
            int valsLeft = 100000000;
            while (valsLeft-- > 0) {
                rec.setValue(valsLeft);
                rec.write((DataOutputView)new OutputViewDataOutputStreamWrapper((DataOutputStream)daos));
            }
            daos.close();
            daos = null;
            long writeElapsed = System.currentTimeMillis() - writeStart;
            FileInputStream fis = new FileInputStream(tempFile);
            dais = new DataInputStream(new BufferedInputStream(fis, bufferSize));
            long readStart = System.currentTimeMillis();
            valsLeft = 100000000;
            while (valsLeft-- > 0) {
                rec.read((DataInputView)new InputViewDataInputStreamWrapper((DataInputStream)dais));
            }
            dais.close();
            dais = null;
            long readElapsed = System.currentTimeMillis() - readStart;
            LOG.info("File-Stream with buffer " + bufferSize + ": write " + writeElapsed + " msecs, read " + readElapsed + " msecs.");
        }
        finally {
            if (daos != null) {
                daos.close();
            }
            if (dais != null) {
                dais.close();
            }
            if (tempFile != null) {
                tempFile.delete();
            }
        }
    }

    @Test
    public void speedTestNIO() throws Exception {
        LOG.info("Starting speed test with java NIO heap buffers and ALIGNED buffer sizes ...");
        for (int bufferSize : SEGMENT_SIZES_ALIGNED) {
            this.speedTestNIO(bufferSize, false);
        }
        LOG.info("Starting speed test with java NIO heap buffers and UNALIGNED buffer sizes ...");
        for (int bufferSize : SEGMENT_SIZES_UNALIGNED) {
            this.speedTestNIO(bufferSize, false);
        }
        LOG.info("Starting speed test with java NIO direct buffers and ALIGNED buffer sizes ...");
        for (int bufferSize : SEGMENT_SIZES_ALIGNED) {
            this.speedTestNIO(bufferSize, true);
        }
        LOG.info("Starting speed test with java NIO direct buffers and UNALIGNED buffer sizes ...");
        for (int bufferSize : SEGMENT_SIZES_UNALIGNED) {
            this.speedTestNIO(bufferSize, true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void speedTestNIO(int bufferSize, boolean direct) throws IOException {
        FileIOChannel.ID tmpChannel = this.ioManager.createChannel();
        File tempFile = null;
        AbstractInterruptibleChannel fs = null;
        try {
            tempFile = new File(tmpChannel.getPath());
            RandomAccessFile raf = new RandomAccessFile(tempFile, "rw");
            fs = raf.getChannel();
            ByteBuffer buf = direct ? ByteBuffer.allocateDirect(bufferSize) : ByteBuffer.allocate(bufferSize);
            long writeStart = System.currentTimeMillis();
            int valsLeft = 100000000;
            while (valsLeft-- > 0) {
                if (buf.remaining() < 4) {
                    buf.flip();
                    ((FileChannel)fs).write(buf);
                    buf.clear();
                }
                buf.putInt(valsLeft);
            }
            if (buf.position() > 0) {
                buf.flip();
                ((FileChannel)fs).write(buf);
            }
            fs.close();
            raf.close();
            fs = null;
            long writeElapsed = System.currentTimeMillis() - writeStart;
            raf = new RandomAccessFile(tempFile, "r");
            fs = raf.getChannel();
            buf.clear();
            long readStart = System.currentTimeMillis();
            ((FileChannel)fs).read(buf);
            buf.flip();
            valsLeft = 100000000;
            while (valsLeft-- > 0) {
                if (buf.remaining() < 4) {
                    buf.compact();
                    ((FileChannel)fs).read(buf);
                    buf.flip();
                }
                if (buf.getInt() == valsLeft) continue;
                throw new IOException();
            }
            fs.close();
            raf.close();
            long readElapsed = System.currentTimeMillis() - readStart;
            LOG.info("NIO Channel with buffer " + bufferSize + ": write " + writeElapsed + " msecs, read " + readElapsed + " msecs.");
        }
        finally {
            if (fs != null) {
                fs.close();
                fs = null;
            }
            if (tempFile != null) {
                tempFile.delete();
            }
        }
    }
}

