package com.hazelcast.internal.networking.nio.iobalancer;

import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.EndpointQualifier;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import com.hazelcast.internal.networking.nio.MigratablePipeline;
import com.hazelcast.internal.networking.nio.NioInboundPipeline;
import com.hazelcast.internal.networking.nio.NioNetworking;
import com.hazelcast.internal.networking.nio.NioOutboundPipeline;
import com.hazelcast.internal.networking.nio.NioPipeline;
import com.hazelcast.internal.networking.nio.NioThread;
import com.hazelcast.internal.server.ServerConnection;
import com.hazelcast.internal.server.ServerConnectionManager;
import com.hazelcast.internal.server.tcp.TcpServerConnection;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.properties.ClusterProperty;
import com.hazelcast.test.Accessors;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.OverridePropertyRule;
import com.hazelcast.test.annotation.NightlyTest;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

@RunWith(HazelcastSerialClassRunner.class)
@Category({NightlyTest.class})
/* loaded from: input_file:com/hazelcast/internal/networking/nio/iobalancer/IOBalancerStressTest.class */
public class IOBalancerStressTest extends HazelcastTestSupport {

    @Rule
    public final OverridePropertyRule overridePropertyRule = OverridePropertyRule.set("hazelcast.io.load", "0");

    @Before
    @After
    public void killAllHazelcastInstances() {
        HazelcastInstanceFactory.terminateAll();
    }

    @Test
    public void testEachConnectionUseDifferentOwnerEventually() {
        Config property = new Config().setProperty(ClusterProperty.IO_BALANCER_INTERVAL_SECONDS.getName(), "1").setProperty(ClusterProperty.IO_THREAD_COUNT.getName(), "4");
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance(property);
        HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance(property);
        HazelcastInstance newHazelcastInstance3 = Hazelcast.newHazelcastInstance(property);
        newHazelcastInstance2.shutdown();
        HazelcastInstance newHazelcastInstance4 = Hazelcast.newHazelcastInstance(property);
        Map<NioThread, Map<MigratablePipeline, Long>> pipelinesLoadPerOwner = getPipelinesLoadPerOwner(newHazelcastInstance);
        Map<NioThread, Map<MigratablePipeline, Long>> pipelinesLoadPerOwner2 = getPipelinesLoadPerOwner(newHazelcastInstance4);
        Map<NioThread, Map<MigratablePipeline, Long>> pipelinesLoadPerOwner3 = getPipelinesLoadPerOwner(newHazelcastInstance3);
        IMap map = newHazelcastInstance.getMap(randomMapName());
        for (int i = 0; i < 10000; i++) {
            map.put(Integer.valueOf(i), Integer.valueOf(i));
        }
        assertBalanced(pipelinesLoadPerOwner, newHazelcastInstance);
        assertBalanced(pipelinesLoadPerOwner2, newHazelcastInstance4);
        assertBalanced(pipelinesLoadPerOwner3, newHazelcastInstance3);
    }

    private void assertBalanced(Map<NioThread, Map<MigratablePipeline, Long>> map, HazelcastInstance hazelcastInstance) {
        Map<NioThread, Map<MigratablePipeline, Long>> pipelinesLoadPerOwner = getPipelinesLoadPerOwner(hazelcastInstance);
        Map<MigratablePipeline, Long> pipelinesLoad = getPipelinesLoad(map);
        try {
            for (Map.Entry<NioThread, Map<MigratablePipeline, Long>> entry : pipelinesLoadPerOwner.entrySet()) {
                NioThread key = entry.getKey();
                Map<MigratablePipeline, Long> value = entry.getValue();
                if (value.size() > 1) {
                    int i = 0;
                    for (Map.Entry<MigratablePipeline, Long> entry2 : value.entrySet()) {
                        if (pipelinesLoad.get(entry2.getKey()).longValue() < entry2.getValue().longValue()) {
                            i++;
                        }
                    }
                    Assert.assertTrue("The number of active pipelines for the owner " + key + " is: " + i, i <= 1);
                }
            }
        } catch (AssertionError e) {
            System.out.println(debug(hazelcastInstance, map));
            throw e;
        }
    }

    private Map<MigratablePipeline, Long> getPipelinesLoad(Map<NioThread, Map<MigratablePipeline, Long>> map) {
        return (Map) map.values().stream().flatMap(map2 -> {
            return map2.entrySet().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    private Map<NioThread, Map<MigratablePipeline, Long>> getPipelinesLoadPerOwner(HazelcastInstance hazelcastInstance) {
        return (Map) Accessors.getConnectionManager(hazelcastInstance).getConnections().stream().map(serverConnection -> {
            return ((TcpServerConnection) serverConnection).getChannel();
        }).flatMap(nioChannel -> {
            return Stream.of((Object[]) new NioPipeline[]{nioChannel.inboundPipeline(), nioChannel.outboundPipeline()});
        }).collect(Collectors.groupingBy((v0) -> {
            return v0.owner();
        }, Collectors.toMap(Function.identity(), (v0) -> {
            return v0.load();
        })));
    }

    private StringBuilder debug(HazelcastInstance hazelcastInstance, Map<NioThread, Map<MigratablePipeline, Long>> map) {
        StringBuilder sb = new StringBuilder();
        sb.append("--- Before load:\n");
        sb.append((CharSequence) debug(map));
        sb.append("\n");
        sb.append("--- After load:\n");
        sb.append((CharSequence) debug(hazelcastInstance));
        return sb;
    }

    private StringBuilder debug(Map<NioThread, Map<MigratablePipeline, Long>> map) {
        StringBuilder sb = new StringBuilder();
        sb.append("in owners\n");
        StringBuilder sb2 = new StringBuilder();
        sb2.append("out owners\n");
        for (Map.Entry<NioThread, Map<MigratablePipeline, Long>> entry : map.entrySet()) {
            StringBuilder sb3 = entry.getKey().getName().contains("thread-in") ? sb : sb2;
            sb3.append(entry.getKey()).append("\n");
            for (Map.Entry<MigratablePipeline, Long> entry2 : entry.getValue().entrySet()) {
                sb3.append("\t").append(entry2.getKey()).append(" load: ").append(entry2.getValue()).append("\n");
            }
        }
        return sb.append((CharSequence) sb2);
    }

    private StringBuilder debug(HazelcastInstance hazelcastInstance) {
        NioNetworking networking = Accessors.getNode(hazelcastInstance).getServer().getNetworking();
        ServerConnectionManager connectionManager = Accessors.getNode(hazelcastInstance).getServer().getConnectionManager(EndpointQualifier.MEMBER);
        StringBuilder sb = new StringBuilder();
        sb.append("in owners\n");
        for (NioThread nioThread : networking.getInputThreads()) {
            sb.append(nioThread).append(": ").append(nioThread.getEventCount()).append("\n");
            Iterator it = connectionManager.getConnections().iterator();
            while (it.hasNext()) {
                NioInboundPipeline inboundPipeline = ((ServerConnection) it.next()).getChannel().inboundPipeline();
                if (inboundPipeline.owner() == nioThread) {
                    sb.append("\t").append(inboundPipeline).append(" load: ").append(inboundPipeline.load()).append("\n");
                }
            }
        }
        sb.append("out owners\n");
        for (NioThread nioThread2 : networking.getOutputThreads()) {
            sb.append(nioThread2).append(": ").append(nioThread2.getEventCount()).append("\n");
            Iterator it2 = connectionManager.getConnections().iterator();
            while (it2.hasNext()) {
                NioOutboundPipeline outboundPipeline = ((ServerConnection) it2.next()).getChannel().outboundPipeline();
                if (outboundPipeline.owner() == nioThread2) {
                    sb.append("\t").append(outboundPipeline).append(" load:").append(outboundPipeline.load()).append("\n");
                }
            }
        }
        return sb;
    }
}
