/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartition;
import org.apache.flink.runtime.io.network.partition.PipelinedSubpartitionView;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.partition.SubpartitionTestBase;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.io.network.util.TestConsumerCallback;
import org.apache.flink.runtime.io.network.util.TestNotificationListener;
import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.io.network.util.TestSubpartitionConsumer;
import org.apache.flink.runtime.io.network.util.TestSubpartitionProducer;
import org.apache.flink.runtime.util.event.NotificationListener;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class PipelinedSubpartitionTest
extends SubpartitionTestBase {
    private static final ExecutorService executorService = Executors.newCachedThreadPool();

    @AfterClass
    public static void shutdownExecutorService() throws Exception {
        executorService.shutdownNow();
    }

    PipelinedSubpartition createSubpartition() {
        ResultPartition parent = (ResultPartition)Mockito.mock(ResultPartition.class);
        return new PipelinedSubpartition(0, parent);
    }

    @Test
    public void testRegisterListener() throws Exception {
        PipelinedSubpartition subpartition = this.createSubpartition();
        TestNotificationListener listener = new TestNotificationListener();
        Assert.assertTrue((boolean)subpartition.registerListener((NotificationListener)listener));
        try {
            subpartition.registerListener((NotificationListener)listener);
            Assert.fail((String)"Did not throw expected exception after duplicate listener registration.");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testListenerNotification() throws Exception {
        TestNotificationListener listener = new TestNotificationListener();
        Assert.assertEquals((long)0L, (long)listener.getNumberOfNotifications());
        PipelinedSubpartition subpartition = this.createSubpartition();
        Assert.assertTrue((boolean)subpartition.registerListener((NotificationListener)listener));
        subpartition.add((Buffer)Mockito.mock(Buffer.class));
        Assert.assertEquals((long)1L, (long)listener.getNumberOfNotifications());
        subpartition.add((Buffer)Mockito.mock(Buffer.class));
        Assert.assertEquals((long)1L, (long)listener.getNumberOfNotifications());
        subpartition = this.createSubpartition();
        Assert.assertTrue((boolean)subpartition.registerListener((NotificationListener)listener));
        subpartition.finish();
        Assert.assertEquals((long)2L, (long)listener.getNumberOfNotifications());
        subpartition = this.createSubpartition();
        Assert.assertTrue((boolean)subpartition.registerListener((NotificationListener)listener));
        subpartition.release();
        Assert.assertEquals((long)3L, (long)listener.getNumberOfNotifications());
    }

    @Test
    public void testIllegalReadViewRequest() throws Exception {
        PipelinedSubpartition subpartition = this.createSubpartition();
        Assert.assertNotNull((Object)subpartition.createReadView(null));
        try {
            subpartition.createReadView(null);
            Assert.fail((String)"Did not throw expected exception after duplicate read view request.");
        }
        catch (IllegalStateException illegalStateException) {
            // empty catch block
        }
    }

    @Test
    public void testBasicPipelinedProduceConsumeLogic() throws Exception {
        PipelinedSubpartition subpartition = this.createSubpartition();
        TestNotificationListener listener = new TestNotificationListener();
        PipelinedSubpartitionView view = subpartition.createReadView(null);
        Assert.assertNull((Object)view.getNextBuffer());
        Assert.assertTrue((boolean)view.registerListener((NotificationListener)listener));
        Assert.assertEquals((long)0L, (long)listener.getNumberOfNotifications());
        subpartition.add(TestBufferFactory.createBuffer());
        Assert.assertEquals((long)1L, (long)listener.getNumberOfNotifications());
        Assert.assertNotNull((Object)view.getNextBuffer());
        Assert.assertNull((Object)view.getNextBuffer());
        subpartition.add(TestBufferFactory.createBuffer());
        Assert.assertFalse((boolean)view.registerListener((NotificationListener)listener));
        Assert.assertEquals((long)1L, (long)listener.getNumberOfNotifications());
    }

    @Test
    public void testConcurrentFastProduceAndFastConsume() throws Exception {
        this.testProduceConsume(false, false);
    }

    @Test
    public void testConcurrentFastProduceAndSlowConsume() throws Exception {
        this.testProduceConsume(false, true);
    }

    @Test
    public void testConcurrentSlowProduceAndFastConsume() throws Exception {
        this.testProduceConsume(true, false);
    }

    @Test
    public void testConcurrentSlowProduceAndSlowConsume() throws Exception {
        this.testProduceConsume(true, true);
    }

    private void testProduceConsume(boolean isSlowProducer, boolean isSlowConsumer) throws Exception {
        int producerBufferPoolSize = 8;
        int producerNumberOfBuffersToProduce = 128;
        TestProducerSource producerSource = new TestProducerSource(){
            private BufferProvider bufferProvider = new TestPooledBufferProvider(8);
            private int numberOfBuffers;

            @Override
            public BufferOrEvent getNextBufferOrEvent() throws Exception {
                if (this.numberOfBuffers == 128) {
                    return null;
                }
                Buffer buffer = this.bufferProvider.requestBufferBlocking();
                MemorySegment segment = buffer.getMemorySegment();
                int next = this.numberOfBuffers * (segment.size() / 4);
                for (int i = 0; i < segment.size(); i += 4) {
                    segment.putInt(i, next);
                    ++next;
                }
                ++this.numberOfBuffers;
                return new BufferOrEvent(buffer, 0);
            }
        };
        TestConsumerCallback consumerCallback = new TestConsumerCallback(){
            private int numberOfBuffers;

            @Override
            public void onBuffer(Buffer buffer) {
                MemorySegment segment = buffer.getMemorySegment();
                int expected = this.numberOfBuffers * (segment.size() / 4);
                for (int i = 0; i < segment.size(); i += 4) {
                    Assert.assertEquals((long)expected, (long)segment.getInt(i));
                    ++expected;
                }
                ++this.numberOfBuffers;
                buffer.recycle();
            }

            @Override
            public void onEvent(AbstractEvent event) {
            }
        };
        PipelinedSubpartition subpartition = this.createSubpartition();
        PipelinedSubpartitionView view = subpartition.createReadView(null);
        Future<Boolean> producer = executorService.submit(new TestSubpartitionProducer((ResultSubpartition)subpartition, isSlowProducer, producerSource));
        Future<Boolean> consumer = executorService.submit(new TestSubpartitionConsumer((ResultSubpartitionView)view, isSlowConsumer, consumerCallback));
        producer.get();
        consumer.get();
    }
}

