/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.resettable;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.record.RecordSerializer;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.MutableObjectIteratorWrapper;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class BlockResettableMutableObjectIteratorTest {
    private static final int MEMORY_CAPACITY = 393216;
    private static final int NUM_VALUES = 20000;
    private final TypeSerializer<Record> serializer = RecordSerializer.get();
    private final AbstractInvokable memOwner = new DummyInvokable();
    private MemoryManager memman;
    private MutableObjectIterator<Record> reader;
    private List<Record> objects;

    @Before
    public void startup() {
        this.memman = new MemoryManager(393216L, 1);
        this.objects = new ArrayList<Record>(20000);
        for (int i = 0; i < 20000; ++i) {
            this.objects.add(new Record((Value)new IntValue(i)));
        }
        this.reader = new MutableObjectIteratorWrapper(this.objects.iterator());
    }

    @After
    public void shutdown() {
        this.objects = null;
        if (!this.memman.verifyEmpty()) {
            Assert.fail((String)"A memory leak has occurred: Not all memory was properly returned to the memory manager.");
        }
        this.memman.shutdown();
        this.memman = null;
    }

    @Test
    public void testSerialBlockResettableIterator() throws Exception {
        try {
            BlockResettableMutableObjectIterator iterator = new BlockResettableMutableObjectIterator(this.memman, this.reader, this.serializer, 1, this.memOwner);
            iterator.open();
            int lower = 0;
            int upper = 0;
            do {
                upper = lower = upper;
                Record target = new Record();
                while ((target = (Record)iterator.next((Object)target)) != null) {
                    int val = ((IntValue)target.getField(0, IntValue.class)).getValue();
                    Assert.assertEquals((long)upper++, (long)val);
                }
                for (int i = 0; i < 5; ++i) {
                    iterator.reset();
                    target = new Record();
                    int count = 0;
                    while ((target = (Record)iterator.next((Object)target)) != null) {
                        int val = ((IntValue)target.getField(0, IntValue.class)).getValue();
                        Assert.assertEquals((long)(lower + count++), (long)val);
                    }
                    Assert.assertEquals((long)(upper - lower), (long)count);
                }
            } while (iterator.nextBlock());
            Assert.assertEquals((long)20000L, (long)upper);
            iterator.close();
        }
        catch (Exception ex) {
            Assert.fail((String)("Test encountered an exception: " + ex.getMessage()));
        }
    }

    @Test
    public void testDoubleBufferedBlockResettableIterator() throws Exception {
        try {
            BlockResettableMutableObjectIterator iterator = new BlockResettableMutableObjectIterator(this.memman, this.reader, this.serializer, 2, this.memOwner);
            iterator.open();
            int lower = 0;
            int upper = 0;
            do {
                upper = lower = upper;
                Record target = new Record();
                while ((target = (Record)iterator.next((Object)target)) != null) {
                    int val = ((IntValue)target.getField(0, IntValue.class)).getValue();
                    Assert.assertEquals((long)upper++, (long)val);
                }
                for (int i = 0; i < 5; ++i) {
                    iterator.reset();
                    target = new Record();
                    int count = 0;
                    while ((target = (Record)iterator.next((Object)target)) != null) {
                        int val = ((IntValue)target.getField(0, IntValue.class)).getValue();
                        Assert.assertEquals((long)(lower + count++), (long)val);
                    }
                    Assert.assertEquals((long)(upper - lower), (long)count);
                }
            } while (iterator.nextBlock());
            Assert.assertEquals((long)20000L, (long)upper);
            iterator.close();
        }
        catch (Exception ex) {
            Assert.fail((String)("Test encountered an exception: " + ex.getMessage()));
        }
    }

    @Test
    public void testTwelveFoldBufferedBlockResettableIterator() throws Exception {
        try {
            BlockResettableMutableObjectIterator iterator = new BlockResettableMutableObjectIterator(this.memman, this.reader, this.serializer, 12, this.memOwner);
            iterator.open();
            int lower = 0;
            int upper = 0;
            do {
                upper = lower = upper;
                Record target = new Record();
                while ((target = (Record)iterator.next((Object)target)) != null) {
                    int val = ((IntValue)target.getField(0, IntValue.class)).getValue();
                    Assert.assertEquals((long)upper++, (long)val);
                }
                for (int i = 0; i < 5; ++i) {
                    iterator.reset();
                    target = new Record();
                    int count = 0;
                    while ((target = (Record)iterator.next((Object)target)) != null) {
                        int val = ((IntValue)target.getField(0, IntValue.class)).getValue();
                        Assert.assertEquals((long)(lower + count++), (long)val);
                    }
                    Assert.assertEquals((long)(upper - lower), (long)count);
                }
            } while (iterator.nextBlock());
            Assert.assertEquals((long)20000L, (long)upper);
            iterator.close();
        }
        catch (Exception ex) {
            Assert.fail((String)("Test encountered an exception: " + ex.getMessage()));
        }
    }
}

