/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer;
import org.apache.flink.runtime.io.network.api.writer.BroadcastRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterTest;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
import org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
import org.apache.flink.runtime.io.network.partition.ResultPartition;
import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.testutils.serialization.types.IntType;
import org.apache.flink.testutils.serialization.types.SerializationTestType;
import org.apache.flink.testutils.serialization.types.SerializationTestTypeFactory;
import org.apache.flink.testutils.serialization.types.Util;
import org.junit.Assert;
import org.junit.Test;

public class BroadcastRecordWriterTest
extends RecordWriterTest {
    public BroadcastRecordWriterTest() {
        super(true);
    }

    @Test
    public void testBroadcastMixedRandomEmitRecord() throws Exception {
        int numberOfChannels = 8;
        int numberOfRecords = 8;
        int bufferSize = 32;
        ResultPartition partition = BroadcastRecordWriterTest.createResultPartition(32, 8);
        BroadcastRecordWriter writer = new BroadcastRecordWriter((ResultPartitionWriter)partition, -1L, "test");
        SpillingAdaptiveSpanningRecordDeserializer deserializer = new SpillingAdaptiveSpanningRecordDeserializer(new String[]{this.tempFolder.getRoot().getAbsolutePath()});
        Util.MockRecords records = Util.randomRecords((int)8, (SerializationTestTypeFactory)SerializationTestTypeFactory.INT);
        HashMap serializedRecords = new HashMap();
        for (int i = 0; i < 8; ++i) {
            serializedRecords.put(i, new ArrayDeque());
        }
        int index = 0;
        for (SerializationTestType record : records) {
            int randomChannel = index++ % 8;
            writer.emit((IOReadableWritable)record, randomChannel);
            ((ArrayDeque)serializedRecords.get(randomChannel)).add(record);
            writer.broadcastEmit((IOReadableWritable)record);
            for (int i = 0; i < 8; ++i) {
                ((ArrayDeque)serializedRecords.get(i)).add(record);
            }
        }
        int numberOfCreatedBuffers = partition.getBufferPool().bestEffortGetNumOfUsedBuffers();
        Assert.assertEquals((long)16L, (long)numberOfCreatedBuffers);
        for (int i = 0; i < 8; ++i) {
            Assert.assertEquals((long)9L, (long)partition.getNumberOfQueuedBuffers(i));
            int excessRandomRecords = i < 0 ? 1 : 0;
            int numberOfRandomRecords = 1 + excessRandomRecords;
            int numberOfTotalRecords = 8 + numberOfRandomRecords;
            this.verifyDeserializationResults(partition.createSubpartitionView(i, (BufferAvailabilityListener)new NoOpBufferAvailablityListener()), (RecordDeserializer<SerializationTestType>)deserializer, (ArrayDeque)serializedRecords.get(i), 9, numberOfTotalRecords);
        }
    }

    @Test
    public void testRandomEmitAndBufferRecycling() throws Exception {
        int recordSize = 8;
        int numberOfChannels = 2;
        ResultPartition partition = BroadcastRecordWriterTest.createResultPartition(2 * recordSize, numberOfChannels);
        BufferPool bufferPool = partition.getBufferPool();
        BroadcastRecordWriter writer = new BroadcastRecordWriter((ResultPartitionWriter)partition, -1L, "test");
        List<Buffer> buffers = Arrays.asList(bufferPool.requestBuffer(), bufferPool.requestBuffer());
        buffers.forEach(Buffer::recycleBuffer);
        Assert.assertEquals((long)3L, (long)bufferPool.getNumberOfAvailableMemorySegments());
        writer.broadcastEmit((IOReadableWritable)new IntType(1));
        writer.broadcastEmit((IOReadableWritable)new IntType(2));
        Assert.assertEquals((long)2L, (long)bufferPool.getNumberOfAvailableMemorySegments());
        Assert.assertEquals((long)1L, (long)partition.getNumberOfQueuedBuffers(0));
        ResultSubpartitionView view0 = partition.createSubpartitionView(0, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        this.closeConsumer(view0, 2 * recordSize);
        Assert.assertEquals((long)2L, (long)bufferPool.getNumberOfAvailableMemorySegments());
        writer.emit((IOReadableWritable)new IntType(3), 0);
        Assert.assertEquals((long)1L, (long)bufferPool.getNumberOfAvailableMemorySegments());
        Assert.assertEquals((long)1L, (long)partition.getNumberOfQueuedBuffers(1));
        ResultSubpartitionView view1 = partition.createSubpartitionView(1, (BufferAvailabilityListener)new NoOpBufferAvailablityListener());
        this.closeConsumer(view1, 2 * recordSize);
        Assert.assertEquals((long)2L, (long)bufferPool.getNumberOfAvailableMemorySegments());
    }

    public void closeConsumer(ResultSubpartitionView view, int expectedSize) throws IOException {
        Buffer buffer = view.getNextBuffer().buffer();
        Assert.assertEquals((long)expectedSize, (long)buffer.getSize());
        buffer.recycleBuffer();
    }
}

