/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.resettable;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntValueSerializer;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.memory.MemoryManagerBuilder;
import org.apache.flink.runtime.operators.resettable.SpillingResettableIterator;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
import org.apache.flink.types.IntValue;
import org.assertj.core.api.AbstractBooleanAssert;
import org.assertj.core.api.AbstractComparableAssert;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class SpillingResettableIteratorTest {
    private static final int NUM_TESTRECORDS = 50000;
    private static final int MEMORY_CAPACITY = 0xA00000;
    private final AbstractInvokable memOwner = new DummyInvokable();
    private IOManager ioman;
    private MemoryManager memman;
    private Iterator<IntValue> reader;
    private final TypeSerializer<IntValue> serializer = new IntValueSerializer();

    SpillingResettableIteratorTest() {
    }

    @BeforeEach
    void startup() {
        this.memman = MemoryManagerBuilder.newBuilder().setMemorySize(0xA00000L).build();
        this.ioman = new IOManagerAsync();
        ArrayList<IntValue> objects = new ArrayList<IntValue>(50000);
        for (int i = 0; i < 50000; ++i) {
            IntValue tmp = new IntValue(i);
            objects.add(tmp);
        }
        this.reader = objects.iterator();
    }

    @AfterEach
    void shutdown() throws Exception {
        this.ioman.close();
        this.ioman = null;
        ((AbstractBooleanAssert)Assertions.assertThat((boolean)this.memman.verifyEmpty()).withFailMessage("A memory leak has occurred: Not all memory was properly returned to the memory manager.", new Object[0])).isTrue();
        this.memman.shutdown();
        this.memman = null;
    }

    @Test
    void testResettableIterator() {
        try {
            SpillingResettableIterator iterator = new SpillingResettableIterator(this.reader, this.serializer, this.memman, this.ioman, 2, this.memOwner);
            iterator.open();
            int count = 0;
            while (iterator.hasNext()) {
                ((AbstractIntegerAssert)Assertions.assertThat((int)((IntValue)iterator.next()).getValue()).withFailMessage("In initial run, element %d does not match expected value!", new Object[]{count})).isEqualTo(count++);
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)count).withFailMessage("Too few elements were deserialized in initial run!", new Object[0])).isEqualTo(50000);
            for (int j = 0; j < 10; ++j) {
                count = 0;
                iterator.reset();
                while (iterator.hasNext()) {
                    ((AbstractIntegerAssert)Assertions.assertThat((int)((IntValue)iterator.next()).getValue()).withFailMessage("After reset nr. %d element %d does not match expected value!", new Object[]{j + 1, count})).isEqualTo(count++);
                }
                ((AbstractIntegerAssert)Assertions.assertThat((int)count).withFailMessage("Too few elements were deserialized after reset nr. %d!", new Object[]{j + 1})).isEqualTo(50000);
            }
            iterator.close();
        }
        catch (Exception ex) {
            ex.printStackTrace();
            Assertions.fail((String)"Test encountered an exception.");
        }
    }

    @Test
    void testResettableIteratorInMemory() {
        try {
            SpillingResettableIterator iterator = new SpillingResettableIterator(this.reader, this.serializer, this.memman, this.ioman, 20, this.memOwner);
            iterator.open();
            int count = 0;
            while (iterator.hasNext()) {
                ((AbstractIntegerAssert)Assertions.assertThat((int)((IntValue)iterator.next()).getValue()).withFailMessage("In initial run, element %d does not match expected value!", new Object[]{count})).isEqualTo(count++);
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)count).withFailMessage("Too few elements were deserialized in initial run!", new Object[0])).isEqualTo(50000);
            for (int j = 0; j < 10; ++j) {
                count = 0;
                iterator.reset();
                while (iterator.hasNext()) {
                    ((AbstractIntegerAssert)Assertions.assertThat((int)((IntValue)iterator.next()).getValue()).withFailMessage("After reset nr. %d element %d does not match expected value!", new Object[]{j + 1, count})).isEqualTo(count++);
                }
                ((AbstractIntegerAssert)Assertions.assertThat((int)count).withFailMessage("Too few elements were deserialized after reset nr. %d!", new Object[]{j + 1})).isEqualTo(50000);
            }
            iterator.close();
        }
        catch (Exception ex) {
            ex.printStackTrace();
            Assertions.fail((String)"Test encountered an exception.");
        }
    }

    @Test
    void testHasNext() {
        try {
            SpillingResettableIterator iterator = new SpillingResettableIterator(this.reader, this.serializer, this.memman, this.ioman, 2, this.memOwner);
            iterator.open();
            int cnt = 0;
            while (iterator.hasNext()) {
                iterator.hasNext();
                iterator.next();
                ++cnt;
            }
            ((AbstractIntegerAssert)Assertions.assertThat((int)cnt).withFailMessage("%d elements read from iterator, but %d expected", new Object[]{cnt, 50000})).isEqualTo(50000);
            iterator.close();
        }
        catch (Exception ex) {
            ex.printStackTrace();
            Assertions.fail((String)"Test encountered an exception.");
        }
    }

    @Test
    void testNext() {
        try {
            SpillingResettableIterator iterator = new SpillingResettableIterator(this.reader, this.serializer, this.memman, this.ioman, 2, this.memOwner);
            iterator.open();
            for (int cnt = 0; cnt < 50000; ++cnt) {
                IntValue record = (IntValue)iterator.next();
                ((AbstractComparableAssert)Assertions.assertThat((Comparable)record).withFailMessage("Record was not read from iterator", new Object[0])).isNotNull();
            }
            ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((SpillingResettableIterator)iterator).next()).withFailMessage("Too many records were read from iterator.", new Object[0])).isInstanceOf(NoSuchElementException.class);
            iterator.close();
        }
        catch (Exception ex) {
            ex.printStackTrace();
            Assertions.fail((String)"Test encountered an exception.");
        }
    }
}

