/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import java.io.IOException;
import java.util.Random;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.core.memory.MemorySegmentProvider;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import org.apache.flink.runtime.io.network.TestingPartitionRequestClient;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferCompressor;
import org.apache.flink.runtime.io.network.buffer.BufferDecompressor;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.partition.InputChannelTestUtils;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLoggerExtension;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@ExtendWith(value={TestLoggerExtension.class})
class NettyMessageClientSideSerializationTest {
    private static final int BUFFER_SIZE = 1024;
    private final Random random = new Random();
    private static BufferCompressor compressor;
    private static BufferDecompressor decompressor;
    private EmbeddedChannel channel;
    private NetworkBufferPool networkBufferPool;
    private SingleInputGate inputGate;
    private InputChannelID inputChannelId;

    NettyMessageClientSideSerializationTest() {
    }

    @BeforeEach
    void setup() throws IOException, InterruptedException {
        this.networkBufferPool = new NetworkBufferPool(8, 1024);
        this.inputGate = InputChannelTestUtils.createSingleInputGate(1, (MemorySegmentProvider)this.networkBufferPool);
        RemoteInputChannel inputChannel = InputChannelTestUtils.createRemoteInputChannel(this.inputGate, new TestingPartitionRequestClient());
        inputChannel.requestSubpartition();
        this.inputGate.setInputChannels(new InputChannel[]{inputChannel});
        this.inputGate.setup();
        CreditBasedPartitionRequestClientHandler handler = new CreditBasedPartitionRequestClientHandler();
        handler.addInputChannel(inputChannel);
        this.channel = new EmbeddedChannel(new ChannelHandler[]{new NettyMessage.NettyMessageEncoder(), new NettyMessageClientDecoderDelegate((NetworkClientHandler)handler)});
        this.inputChannelId = inputChannel.getInputChannelId();
    }

    @AfterEach
    void tearDown() throws IOException {
        if (this.inputGate != null) {
            this.inputGate.close();
        }
        if (this.networkBufferPool != null) {
            this.networkBufferPool.destroyAllBufferPools();
            this.networkBufferPool.destroy();
        }
        if (this.channel != null) {
            this.channel.close();
        }
    }

    @Test
    void testErrorResponseWithoutErrorMessage() {
        this.testErrorResponse(new NettyMessage.ErrorResponse((Throwable)new IllegalStateException(), this.inputChannelId));
    }

    @Test
    void testErrorResponseWithErrorMessage() {
        this.testErrorResponse(new NettyMessage.ErrorResponse((Throwable)new IllegalStateException("Illegal illegal illegal"), this.inputChannelId));
    }

    @Test
    void testErrorResponseWithFatalError() {
        this.testErrorResponse(new NettyMessage.ErrorResponse((Throwable)new IllegalStateException("Illegal illegal illegal")));
    }

    @Test
    void testOrdinaryBufferResponse() {
        this.testBufferResponse(false, false);
    }

    @Test
    void testBufferResponseWithReadOnlySlice() {
        this.testBufferResponse(true, false);
    }

    @ParameterizedTest
    @ValueSource(strings={"LZ4", "LZO", "ZSTD"})
    void testCompressedBufferResponse(String codecFactoryName) {
        compressor = new BufferCompressor(1024, codecFactoryName);
        decompressor = new BufferDecompressor(1024, codecFactoryName);
        this.testBufferResponse(false, true);
    }

    @Test
    void testBacklogAnnouncement() {
        NettyMessage.BacklogAnnouncement expected = new NettyMessage.BacklogAnnouncement(1024, this.inputChannelId);
        NettyMessage.BacklogAnnouncement actual = NettyTestUtil.encodeAndDecode(expected, this.channel);
        Assertions.assertThat((int)actual.backlog).isEqualTo(expected.backlog);
        Assertions.assertThat((Comparable)actual.receiverId).isEqualTo((Object)expected.receiverId);
    }

    private void testErrorResponse(NettyMessage.ErrorResponse expect) {
        NettyMessage.ErrorResponse actual = NettyTestUtil.encodeAndDecode(expect, this.channel);
        NettyTestUtil.verifyErrorResponse(expect, actual);
    }

    private void testBufferResponse(boolean testReadOnlyBuffer, boolean testCompressedBuffer) {
        Preconditions.checkArgument((!(testReadOnlyBuffer & testCompressedBuffer) ? 1 : 0) != 0, (Object)"There are no cases with both readonly slice and compression.");
        NetworkBuffer buffer = new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment((int)1024), FreeingBufferRecycler.INSTANCE);
        for (int i = 0; i < 1024; i += 8) {
            buffer.writeLong((long)i);
        }
        NetworkBuffer testBuffer = buffer;
        if (testReadOnlyBuffer) {
            testBuffer = buffer.readOnlySlice();
        } else if (testCompressedBuffer) {
            testBuffer = compressor.compressToOriginalBuffer((Buffer)buffer);
        }
        NettyMessage.BufferResponse expected = new NettyMessage.BufferResponse((Buffer)testBuffer, this.random.nextInt(Integer.MAX_VALUE), this.inputChannelId, this.random.nextInt(Integer.MAX_VALUE));
        NettyMessage.BufferResponse actual = NettyTestUtil.encodeAndDecode(expected, this.channel);
        Assertions.assertThat((boolean)buffer.isRecycled()).isTrue();
        Assertions.assertThat((boolean)testBuffer.isRecycled()).isTrue();
        ((ObjectAssert)Assertions.assertThat((Object)actual.getBuffer()).as("The request input channel should always have available buffers in this test.", new Object[0])).isNotNull();
        Buffer decodedBuffer = actual.getBuffer();
        if (testCompressedBuffer) {
            Assertions.assertThat((boolean)actual.isCompressed).isTrue();
            decodedBuffer = this.decompress(decodedBuffer);
        }
        NettyTestUtil.verifyBufferResponseHeader(expected, actual);
        Assertions.assertThat((int)decodedBuffer.readableBytes()).isEqualTo(1024);
        for (int i = 0; i < 1024; i += 8) {
            Assertions.assertThat((long)decodedBuffer.asByteBuf().readLong()).isEqualTo((long)i);
        }
        actual.releaseBuffer();
        if (testCompressedBuffer) {
            decodedBuffer.recycleBuffer();
        }
        Assertions.assertThat((boolean)actual.getBuffer().isRecycled()).isTrue();
    }

    private Buffer decompress(Buffer buffer) {
        MemorySegment segment = MemorySegmentFactory.allocateUnpooledSegment((int)1024);
        NetworkBuffer compressedBuffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
        buffer.asByteBuf().readBytes(compressedBuffer.asByteBuf(), buffer.readableBytes());
        compressedBuffer.setCompressed(true);
        return decompressor.decompressToOriginalBuffer((Buffer)compressedBuffer);
    }
}

