package org.infinispan.conflict.impl;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.infinispan.Cache;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.statetransfer.ConflictResolutionStartCommand;
import org.infinispan.commands.statetransfer.StateTransferCancelCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.test.Exceptions;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.distribution.LocalizedCacheTopology;
import org.infinispan.distribution.TestAddress;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.distribution.ch.impl.DefaultConsistentHashFactory;
import org.infinispan.distribution.ch.impl.HashFunctionPartitioner;
import org.infinispan.notifications.cachelistener.CacheNotifier;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.notifications.cachelistener.event.impl.EventImpl;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.SuccessfulResponse;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.statetransfer.InboundTransferTask;
import org.infinispan.statetransfer.StateChunk;
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.PersistentUUID;
import org.infinispan.topology.PersistentUUIDManagerImpl;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.testng.AssertJUnit;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = {"functional"}, testName = "conflict.resolution.StateReceiverTest")
/* loaded from: input_file:org/infinispan/conflict/impl/StateReceiverTest.class */
public class StateReceiverTest extends AbstractInfinispanTest {
    private StateReceiverImpl<Object, Object> stateReceiver;
    private LocalizedCacheTopology localizedCacheTopology;
    private final ExecutorService stateTransferExecutor = Executors.newSingleThreadExecutor(getTestThreadFactory("StateTransfer"));

    public void testGetReplicaException() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        completableFuture.completeExceptionally(new CacheException("Problem encountered retrieving state"));
        initTransferTaskMock(completableFuture);
        Exceptions.expectExecutionException(CacheException.class, this.stateReceiver.getAllReplicasForSegment(0, this.localizedCacheTopology, 10000L));
    }

    public void testTopologyChangeDuringSegmentRequest() {
        initTransferTaskMock(new CompletableFuture<>());
        CompletableFuture allReplicasForSegment = this.stateReceiver.getAllReplicasForSegment(0, this.localizedCacheTopology, 10000L);
        AssertJUnit.assertTrue(!allReplicasForSegment.isCancelled());
        AssertJUnit.assertTrue(!allReplicasForSegment.isCompletedExceptionally());
        this.stateReceiver.onDataRehash(createEventImpl(4, 1, Event.Type.DATA_REHASHED));
        AssertJUnit.assertTrue(allReplicasForSegment.isCompletedExceptionally());
        Exceptions.expectExecutionException(CacheException.class, allReplicasForSegment);
        this.stateReceiver.onDataRehash(createEventImpl(4, 4, Event.Type.DATA_REHASHED));
        CompletableFuture allReplicasForSegment2 = this.stateReceiver.getAllReplicasForSegment(1, this.localizedCacheTopology, 10000L);
        AssertJUnit.assertTrue(!allReplicasForSegment2.isCompletedExceptionally());
        AssertJUnit.assertTrue(!allReplicasForSegment2.isCancelled());
    }

    public void testOldAndInvalidStateIgnored() {
        initTransferTaskMock(new CompletableFuture<>());
        this.stateReceiver.getAllReplicasForSegment(0, this.localizedCacheTopology, 10000L);
        ArrayList arrayList = new ArrayList(this.stateReceiver.getTransferTaskMap(0).keySet());
        Map keyReplicaMap = this.stateReceiver.getKeyReplicaMap(0);
        AssertJUnit.assertEquals(0, keyReplicaMap.size());
        this.stateReceiver.receiveState((Address) arrayList.get(0), 2, createStateChunks("Key1", "Value1"));
        AssertJUnit.assertEquals(1, keyReplicaMap.size());
        this.stateReceiver.receiveState(new TestAddress(5), 2, createStateChunks("Key2", "Value2"));
        AssertJUnit.assertEquals(1, keyReplicaMap.size());
        this.stateReceiver.receiveState((Address) arrayList.get(1), 1, new ArrayList());
        AssertJUnit.assertEquals(1, keyReplicaMap.size());
    }

    @Test(expectedExceptions = {CancellationException.class})
    public void testRequestCanBeCancelledDuringTransfer() throws Exception {
        InboundTransferTask inboundTransferTask = (InboundTransferTask) Mockito.mock(InboundTransferTask.class);
        Mockito.when(inboundTransferTask.requestSegments()).thenAnswer(invocationOnMock -> {
            TestingUtil.sleepThread(1000L);
            return CompletableFuture.completedFuture(new HashMap());
        });
        ((StateReceiverImpl) Mockito.doReturn(inboundTransferTask).when(this.stateReceiver)).createTransferTask(((Integer) ArgumentMatchers.any(Integer.class)).intValue(), (Address) ArgumentMatchers.any(Address.class), (CacheTopology) ArgumentMatchers.any(CacheTopology.class), ((Long) ArgumentMatchers.any(Long.class)).longValue());
        CompletableFuture allReplicasForSegment = this.stateReceiver.getAllReplicasForSegment(0, this.localizedCacheTopology, 10000L);
        allReplicasForSegment.whenComplete((list, th) -> {
            AssertJUnit.assertNull(list);
            AssertJUnit.assertNotNull(th);
            AssertJUnit.assertTrue(th instanceof CancellationException);
        });
        this.stateReceiver.stop();
        allReplicasForSegment.get();
    }

    @BeforeMethod
    private void createAndInitStateReceiver() {
        CommandsFactory commandsFactory = (CommandsFactory) Mockito.mock(CommandsFactory.class);
        InternalDataContainer internalDataContainer = (InternalDataContainer) Mockito.mock(InternalDataContainer.class);
        RpcManager rpcManager = (RpcManager) Mockito.mock(RpcManager.class);
        CacheNotifier cacheNotifier = (CacheNotifier) Mockito.mock(CacheNotifier.class);
        Answer answer = invocationOnMock -> {
            Address address = (Address) ((Collection) invocationOnMock.getArguments()[0]).iterator().next();
            HashMap hashMap = new HashMap(1);
            hashMap.put(address, SuccessfulResponse.SUCCESSFUL_EMPTY_RESPONSE);
            return hashMap;
        };
        Mockito.when(rpcManager.invokeCommand((Collection) ArgumentMatchers.any(Collection.class), (ReplicableCommand) ArgumentMatchers.any(ConflictResolutionStartCommand.class), (ResponseCollector) ArgumentMatchers.any(), (RpcOptions) ArgumentMatchers.any())).thenAnswer(answer);
        Mockito.when(rpcManager.invokeCommand((Collection) ArgumentMatchers.any(Collection.class), (ReplicableCommand) ArgumentMatchers.any(StateTransferCancelCommand.class), (ResponseCollector) ArgumentMatchers.any(), (RpcOptions) ArgumentMatchers.any())).thenAnswer(answer);
        Mockito.when(rpcManager.getSyncRpcOptions()).thenAnswer(invocationOnMock2 -> {
            return new RpcOptions(DeliverOrder.PER_SENDER, 10000L, TimeUnit.MILLISECONDS);
        });
        StateReceiverImpl stateReceiverImpl = new StateReceiverImpl();
        TestingUtil.inject(stateReceiverImpl, cacheNotifier, commandsFactory, internalDataContainer, rpcManager, this.stateTransferExecutor);
        stateReceiverImpl.start();
        stateReceiverImpl.onDataRehash(createEventImpl(2, 4, Event.Type.DATA_REHASHED));
        this.localizedCacheTopology = createLocalizedCacheTopology(4);
        this.stateReceiver = (StateReceiverImpl) Mockito.spy(stateReceiverImpl);
    }

    @AfterClass(alwaysRun = true)
    public void stopExecutor() {
        this.stateTransferExecutor.shutdownNow();
    }

    private void initTransferTaskMock(CompletableFuture<Void> completableFuture) {
        InboundTransferTask inboundTransferTask = (InboundTransferTask) Mockito.mock(InboundTransferTask.class);
        Mockito.when(inboundTransferTask.requestSegments()).thenReturn(completableFuture);
        ((StateReceiverImpl) Mockito.doReturn(inboundTransferTask).when(this.stateReceiver)).createTransferTask(((Integer) ArgumentMatchers.any(Integer.class)).intValue(), (Address) ArgumentMatchers.any(Address.class), (CacheTopology) ArgumentMatchers.any(CacheTopology.class), ((Long) ArgumentMatchers.any(Long.class)).longValue());
    }

    private Collection<StateChunk> createStateChunks(Object obj, Object obj2) {
        return Collections.singleton(new StateChunk(0, Collections.singleton(new ImmortalCacheEntry(obj, obj2)), true));
    }

    private ConsistentHash createConsistentHash(int i) {
        PersistentUUIDManagerImpl persistentUUIDManagerImpl = new PersistentUUIDManagerImpl();
        ArrayList arrayList = new ArrayList(i);
        for (int i2 = 0; i2 < i; i2++) {
            TestAddress testAddress = new TestAddress(i2);
            arrayList.add(testAddress);
            persistentUUIDManagerImpl.addPersistentAddressMapping(testAddress, PersistentUUID.randomUUID());
        }
        return new DefaultConsistentHashFactory().create(2, 40, arrayList, (Map) null);
    }

    private LocalizedCacheTopology createLocalizedCacheTopology(int i) {
        ConsistentHash createConsistentHash = createConsistentHash(i);
        return new LocalizedCacheTopology(CacheMode.DIST_SYNC, new CacheTopology(-1, -1, createConsistentHash, (ConsistentHash) null, CacheTopology.Phase.NO_REBALANCE, createConsistentHash.getMembers(), (List) null), new HashFunctionPartitioner(), (Address) createConsistentHash.getMembers().get(0), true);
    }

    private EventImpl createEventImpl(int i, int i2, Event.Type type) {
        EventImpl createEvent = EventImpl.createEvent((Cache) null, type);
        ConsistentHash createConsistentHash = createConsistentHash(i2);
        createEvent.setReadConsistentHashAtEnd(createConsistentHash);
        createEvent.setWriteConsistentHashAtEnd(createConsistentHash);
        createEvent.setNewTopologyId(i);
        createEvent.setPre(true);
        return createEvent;
    }
}
