/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.io.network.NetworkSequenceViewReader;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.api.StopMode;
import org.apache.flink.runtime.io.network.netty.CreditBasedSequenceNumberingViewReader;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.PartitionRequestQueue;
import org.apache.flink.runtime.io.network.netty.PartitionRequestServerHandler;
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

public class PartitionRequestServerHandlerTest
extends TestLogger {
    @Test
    public void testResponsePartitionNotFoundException() {
        PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler((ResultPartitionProvider)new ResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), new PartitionRequestQueue());
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{serverHandler});
        ResultPartitionID partitionId = new ResultPartitionID();
        channel.writeInbound(new Object[]{new NettyMessage.PartitionRequest(partitionId, 0, new InputChannelID(), 2)});
        channel.runPendingTasks();
        Object msg = channel.readOutbound();
        Assert.assertThat((Object)msg, (Matcher)Matchers.instanceOf(NettyMessage.ErrorResponse.class));
        NettyMessage.ErrorResponse err = (NettyMessage.ErrorResponse)msg;
        Assert.assertThat((Object)err.cause, (Matcher)Matchers.instanceOf(PartitionNotFoundException.class));
        ResultPartitionID actualPartitionId = ((PartitionNotFoundException)err.cause).getPartitionId();
        Assert.assertThat((Object)partitionId, (Matcher)Matchers.is((Object)actualPartitionId));
    }

    @Test
    public void testResumeConsumption() {
        InputChannelID inputChannelID = new InputChannelID();
        PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
        TestViewReader testViewReader = new TestViewReader(inputChannelID, 2, partitionRequestQueue);
        PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler((ResultPartitionProvider)new ResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), partitionRequestQueue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{serverHandler});
        partitionRequestQueue.notifyReaderCreated((NetworkSequenceViewReader)testViewReader);
        channel.writeInbound(new Object[]{new NettyMessage.ResumeConsumption(inputChannelID)});
        channel.runPendingTasks();
        Assert.assertTrue((boolean)testViewReader.consumptionResumed);
    }

    @Test
    public void testAcknowledgeAllRecordsProcessed() throws IOException {
        InputChannelID inputChannelID = new InputChannelID();
        ResultPartition resultPartition = PartitionTestUtils.createPartition(ResultPartitionType.PIPELINED_BOUNDED);
        ResultPartitionProvider partitionProvider = (partitionId, index, availabilityListener) -> resultPartition.createSubpartitionView(index, availabilityListener);
        PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
        PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler((ResultPartitionProvider)new ResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), partitionRequestQueue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{serverHandler, partitionRequestQueue});
        CreditBasedSequenceNumberingViewReader viewReader = new CreditBasedSequenceNumberingViewReader(inputChannelID, 2, partitionRequestQueue);
        viewReader.requestSubpartitionView(partitionProvider, resultPartition.getPartitionId(), 0);
        partitionRequestQueue.notifyReaderCreated((NetworkSequenceViewReader)viewReader);
        resultPartition.notifyEndOfData(StopMode.DRAIN);
        CompletableFuture allRecordsProcessedFuture = resultPartition.getAllDataProcessedFuture();
        Assert.assertFalse((boolean)allRecordsProcessedFuture.isDone());
        channel.writeInbound(new Object[]{new NettyMessage.AckAllUserRecordsProcessed(inputChannelID)});
        channel.runPendingTasks();
        Assert.assertTrue((boolean)allRecordsProcessedFuture.isDone());
        Assert.assertFalse((boolean)allRecordsProcessedFuture.isCompletedExceptionally());
    }

    @Test
    public void testNewBufferSize() {
        InputChannelID inputChannelID = new InputChannelID();
        PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
        TestViewReader testViewReader = new TestViewReader(inputChannelID, 2, partitionRequestQueue);
        PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler((ResultPartitionProvider)new ResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), partitionRequestQueue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{serverHandler});
        partitionRequestQueue.notifyReaderCreated((NetworkSequenceViewReader)testViewReader);
        channel.writeInbound(new Object[]{new NettyMessage.NewBufferSize(666, inputChannelID)});
        channel.runPendingTasks();
        Assert.assertEquals((long)666L, (long)testViewReader.bufferSize);
    }

    @Test
    public void testReceivingNewBufferSizeBeforeReaderIsCreated() {
        InputChannelID inputChannelID = new InputChannelID();
        PartitionRequestQueue partitionRequestQueue = new PartitionRequestQueue();
        TestViewReader testViewReader = new TestViewReader(inputChannelID, 2, partitionRequestQueue);
        PartitionRequestServerHandler serverHandler = new PartitionRequestServerHandler((ResultPartitionProvider)new ResultPartitionManager(), (TaskEventPublisher)new TaskEventDispatcher(), partitionRequestQueue);
        EmbeddedChannel channel = new EmbeddedChannel(new ChannelHandler[]{serverHandler});
        channel.writeInbound(new Object[]{new NettyMessage.NewBufferSize(666, inputChannelID)});
        channel.runPendingTasks();
        Assert.assertTrue((String)channel.outboundMessages().toString(), (boolean)channel.outboundMessages().isEmpty());
        Assert.assertEquals((long)-1L, (long)testViewReader.bufferSize);
    }

    private static class TestViewReader
    extends CreditBasedSequenceNumberingViewReader {
        private boolean consumptionResumed = false;
        private int bufferSize = -1;

        TestViewReader(InputChannelID receiverId, int initialCredit, PartitionRequestQueue requestQueue) {
            super(receiverId, initialCredit, requestQueue);
        }

        public void resumeConsumption() {
            this.consumptionResumed = true;
        }

        public void notifyNewBufferSize(int newBufferSize) {
            this.bufferSize = newBufferSize;
        }
    }
}

