/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.resettable;

import java.util.ArrayList;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.record.RecordSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.runtime.operators.testutils.MutableObjectIteratorWrapper;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.types.Value;
import org.apache.flink.util.MutableObjectIterator;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SpillingResettableMutableObjectIteratorTest {
    private static final int NUM_TESTRECORDS = 50000;
    private static final int MEMORY_CAPACITY = 0xA00000;
    private IOManager ioman;
    private MemoryManager memman;
    private MutableObjectIterator<Record> reader;
    private final TypeSerializer<Record> serializer = RecordSerializer.get();

    @Before
    public void startup() {
        this.memman = new MemoryManager(0xA00000L, 1);
        this.ioman = new IOManagerAsync();
        ArrayList<Record> objects = new ArrayList<Record>(50000);
        for (int i = 0; i < 50000; ++i) {
            Record tmp = new Record((Value)new IntValue(i));
            objects.add(tmp);
        }
        this.reader = new MutableObjectIteratorWrapper(objects.iterator());
    }

    @After
    public void shutdown() {
        this.ioman.shutdown();
        if (!this.ioman.isProperlyShutDown()) {
            Assert.fail((String)"I/O Manager Shutdown was not completed properly.");
        }
        this.ioman = null;
        if (!this.memman.verifyEmpty()) {
            Assert.fail((String)"A memory leak has occurred: Not all memory was properly returned to the memory manager.");
        }
        this.memman.shutdown();
        this.memman = null;
    }

    @Test
    public void testResettableIterator() {
        try {
            DummyInvokable memOwner = new DummyInvokable();
            SpillingResettableMutableObjectIterator iterator = new SpillingResettableMutableObjectIterator(this.reader, this.serializer, this.memman, this.ioman, 2, (AbstractInvokable)memOwner);
            iterator.open();
            int count = 0;
            Record target = new Record();
            while ((target = (Record)iterator.next((Object)target)) != null) {
                Assert.assertEquals((String)("In initial run, element " + count + " does not match expected value!"), (long)count++, (long)((IntValue)target.getField(0, IntValue.class)).getValue());
            }
            Assert.assertEquals((String)"Too few elements were deserialzied in initial run!", (long)50000L, (long)count);
            for (int j = 0; j < 10; ++j) {
                count = 0;
                iterator.reset();
                target = new Record();
                while ((target = (Record)iterator.next((Object)target)) != null) {
                    Assert.assertEquals((String)("After reset nr. " + j + 1 + " element " + count + " does not match expected value!"), (long)count++, (long)((IntValue)target.getField(0, IntValue.class)).getValue());
                }
                Assert.assertEquals((String)("Too few elements were deserialzied after reset nr. " + j + 1 + "!"), (long)50000L, (long)count);
            }
            iterator.close();
        }
        catch (Exception ex) {
            ex.printStackTrace();
            Assert.fail((String)"Test encountered an exception.");
        }
    }

    @Test
    public void testResettableIteratorInMemory() {
        try {
            DummyInvokable memOwner = new DummyInvokable();
            SpillingResettableMutableObjectIterator iterator = new SpillingResettableMutableObjectIterator(this.reader, this.serializer, this.memman, this.ioman, 20, (AbstractInvokable)memOwner);
            iterator.open();
            int count = 0;
            Record target = new Record();
            while ((target = (Record)iterator.next((Object)target)) != null) {
                Assert.assertEquals((String)("In initial run, element " + count + " does not match expected value!"), (long)count++, (long)((IntValue)target.getField(0, IntValue.class)).getValue());
            }
            Assert.assertEquals((String)"Too few elements were deserialzied in initial run!", (long)50000L, (long)count);
            for (int j = 0; j < 10; ++j) {
                count = 0;
                iterator.reset();
                target = new Record();
                while ((target = (Record)iterator.next((Object)target)) != null) {
                    Assert.assertEquals((String)("After reset nr. " + j + 1 + " element " + count + " does not match expected value!"), (long)count++, (long)((IntValue)target.getField(0, IntValue.class)).getValue());
                }
                Assert.assertEquals((String)("Too few elements were deserialzied after reset nr. " + j + 1 + "!"), (long)50000L, (long)count);
            }
            iterator.close();
        }
        catch (Exception ex) {
            ex.printStackTrace();
            Assert.fail((String)"Test encountered an exception.");
        }
    }
}

