package org.infinispan.container.versioning;

import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.infinispan.Cache;
import org.infinispan.commands.ReplicableCommand;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commands.tx.CommitCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commons.util.concurrent.CompletableFutures;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.IsolationLevel;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.impl.InternalDataContainer;
import org.infinispan.context.impl.TxInvocationContext;
import org.infinispan.interceptors.DDAsyncInterceptor;
import org.infinispan.remoting.inboundhandler.AbstractDelegatingHandler;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.inboundhandler.PerCacheInboundInvocationHandler;
import org.infinispan.remoting.inboundhandler.Reply;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.ResponseCollector;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestDataSCI;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CleanupAfterMethod;
import org.infinispan.test.fwk.InCacheMode;
import org.infinispan.transaction.impl.WriteSkewHelper;
import org.infinispan.util.AbstractDelegatingRpcManager;
import org.infinispan.util.ControlledConsistentHashFactory;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

@CleanupAfterMethod
@InCacheMode({CacheMode.DIST_SYNC, CacheMode.REPL_SYNC})
@Test(groups = {"functional"}, testName = "container.versioning.WriteSkewConsistencyTest")
/* loaded from: input_file:org/infinispan/container/versioning/WriteSkewConsistencyTest.class */
public class WriteSkewConsistencyTest extends MultipleCacheManagersTest {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/infinispan/container/versioning/WriteSkewConsistencyTest$BackupOwnerInterceptor.class */
    public static class BackupOwnerInterceptor extends DDAsyncInterceptor {
        private boolean prepareProcessed;
        private final Object prepareProcessedLock = new Object();
        private volatile CompletableFuture<Void> commitBlocker = CompletableFutures.completedNull();

        BackupOwnerInterceptor() {
        }

        public Object visitCommitCommand(TxInvocationContext txInvocationContext, CommitCommand commitCommand) throws Throwable {
            return asyncInvokeNext(txInvocationContext, commitCommand, this.commitBlocker);
        }

        public Object visitPrepareCommand(TxInvocationContext txInvocationContext, PrepareCommand prepareCommand) throws Throwable {
            return invokeNextAndFinally(txInvocationContext, prepareCommand, (invocationContext, prepareCommand2, obj, th) -> {
                notifyPrepareProcessed();
            });
        }

        void blockCommit() {
            this.commitBlocker = new CompletableFuture<>();
        }

        void unblockCommit() {
            this.commitBlocker.complete(null);
        }

        boolean awaitPrepare() throws InterruptedException {
            boolean z;
            synchronized (this.prepareProcessedLock) {
                long nanoTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(10000L);
                for (long millis = TimeUnit.NANOSECONDS.toMillis(nanoTime - System.nanoTime()); !this.prepareProcessed && millis > 0; millis = TimeUnit.NANOSECONDS.toMillis(nanoTime - System.nanoTime())) {
                    this.prepareProcessedLock.wait(millis);
                }
                z = this.prepareProcessed;
            }
            return z;
        }

        void resetPrepare() {
            synchronized (this.prepareProcessedLock) {
                this.prepareProcessed = false;
            }
        }

        private void notifyPrepareProcessed() {
            synchronized (this.prepareProcessedLock) {
                this.prepareProcessed = true;
                this.prepareProcessedLock.notifyAll();
            }
        }
    }

    /* loaded from: input_file:org/infinispan/container/versioning/WriteSkewConsistencyTest$ControllerInboundInvocationHandler.class */
    private static class ControllerInboundInvocationHandler extends AbstractDelegatingHandler {
        private volatile boolean discardRemoteGet;

        private ControllerInboundInvocationHandler(PerCacheInboundInvocationHandler perCacheInboundInvocationHandler) {
            super(perCacheInboundInvocationHandler);
        }

        public void handle(CacheRpcCommand cacheRpcCommand, Reply reply, DeliverOrder deliverOrder) {
            if (this.discardRemoteGet && cacheRpcCommand.getCommandId() == 16) {
                return;
            }
            this.delegate.handle(cacheRpcCommand, reply, deliverOrder);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/infinispan/container/versioning/WriteSkewConsistencyTest$ReorderResponsesRpcManager.class */
    public static class ReorderResponsesRpcManager extends AbstractDelegatingRpcManager {
        private final Address lastResponse;

        ReorderResponsesRpcManager(Address address, RpcManager rpcManager) {
            super(rpcManager);
            this.lastResponse = address;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.infinispan.util.AbstractDelegatingRpcManager
        public <T> CompletionStage<T> performRequest(Collection<Address> collection, ReplicableCommand replicableCommand, ResponseCollector<T> responseCollector, Function<ResponseCollector<T>, CompletionStage<T>> function, RpcOptions rpcOptions) {
            return (CompletionStage<T>) super.performRequest(collection, replicableCommand, responseCollector, function, rpcOptions).thenApply(obj -> {
                if (!(obj instanceof Map)) {
                    WriteSkewConsistencyTest.log.debugf("Single response for command %s: %s", replicableCommand, obj);
                    return obj;
                }
                LinkedHashMap linkedHashMap = new LinkedHashMap(collection.size());
                boolean z = false;
                for (Map.Entry entry : ((Map) obj).entrySet()) {
                    if (this.lastResponse.equals(entry.getKey())) {
                        z = true;
                    } else {
                        linkedHashMap.put((Address) entry.getKey(), (Response) entry.getValue());
                    }
                }
                if (z) {
                    linkedHashMap.put(this.lastResponse, (Response) ((Map) obj).get(this.lastResponse));
                }
                WriteSkewConsistencyTest.log.debugf("Responses for command %s are %s", replicableCommand, linkedHashMap.values());
                return linkedHashMap;
            });
        }
    }

    public void testValidationOnlyInPrimaryOwner() throws Exception {
        String str = "key";
        DataContainer dataContainer = (DataContainer) TestingUtil.extractComponent(mo375cache(1), InternalDataContainer.class);
        DataContainer dataContainer2 = (DataContainer) TestingUtil.extractComponent(mo375cache(0), InternalDataContainer.class);
        VersionGenerator versionGenerator = (VersionGenerator) TestingUtil.extractComponent(mo375cache(1), VersionGenerator.class);
        injectReorderResponseRpcManager(mo375cache(3), mo375cache(0));
        mo375cache(1).put("key", 1);
        for (Cache<?, ?> cache : caches()) {
            AssertJUnit.assertEquals("Wrong initial value for cache " + String.valueOf(address(cache)), 1, cache.get("key"));
        }
        InternalCacheEntry peek = dataContainer.peek("key");
        assertSameVersion("Wrong version for the same key", WriteSkewHelper.versionFromEntry(peek), WriteSkewHelper.versionFromEntry(dataContainer2.peek("key")));
        IncrementableEntryVersion versionFromEntry = WriteSkewHelper.versionFromEntry(peek);
        IncrementableEntryVersion increment = versionGenerator.increment(versionFromEntry);
        IncrementableEntryVersion increment2 = versionGenerator.increment(increment);
        ControllerInboundInvocationHandler wrapInboundInvocationHandler = TestingUtil.wrapInboundInvocationHandler(mo375cache(0), ControllerInboundInvocationHandler::new);
        BackupOwnerInterceptor injectBackupOwnerInterceptor = injectBackupOwnerInterceptor(mo375cache(0));
        injectBackupOwnerInterceptor.blockCommit();
        wrapInboundInvocationHandler.discardRemoteGet = true;
        Future fork = fork(() -> {
            tm(2).begin();
            AssertJUnit.assertEquals("Wrong value for tx1.", 1, mo375cache(2).get(str));
            mo375cache(2).put(str, 2);
            tm(2).commit();
            return Boolean.TRUE;
        });
        eventually(() -> {
            return Objects.equals(mo375cache(1).get(str), 2);
        });
        eventually(() -> {
            return Objects.equals(mo375cache(3).get(str), 2);
        });
        assertSameVersion("Wrong version in the primary owner", WriteSkewHelper.versionFromEntry(dataContainer.peek("key")), increment);
        assertSameVersion("Wrong version in the backup owner", WriteSkewHelper.versionFromEntry(dataContainer2.peek("key")), versionFromEntry);
        injectBackupOwnerInterceptor.resetPrepare();
        Future fork2 = fork(() -> {
            tm(3).begin();
            AssertJUnit.assertEquals("Wrong value for tx2.", 2, mo375cache(3).get(str));
            mo375cache(3).put(str, 3);
            tm(3).commit();
            return Boolean.TRUE;
        });
        AssertJUnit.assertTrue("Prepare of tx2 was never received.", injectBackupOwnerInterceptor.awaitPrepare());
        injectBackupOwnerInterceptor.unblockCommit();
        wrapInboundInvocationHandler.discardRemoteGet = false;
        AssertJUnit.assertTrue("Error in tx1.", ((Boolean) fork.get(15L, TimeUnit.SECONDS)).booleanValue());
        AssertJUnit.assertTrue("Error in tx2.", ((Boolean) fork2.get(15L, TimeUnit.SECONDS)).booleanValue());
        assertSameVersion("Wrong version in the primary owner", WriteSkewHelper.versionFromEntry(dataContainer.peek("key")), increment2);
        assertSameVersion("Wrong version in the backup owner", WriteSkewHelper.versionFromEntry(dataContainer2.peek("key")), increment2);
        assertNoTransactions();
        assertNotLocked("key");
    }

    @Override // org.infinispan.test.MultipleCacheManagersTest
    protected final void createCacheManagers() {
        ConfigurationBuilder defaultClusteredCacheConfig = getDefaultClusteredCacheConfig(this.cacheMode, true);
        defaultClusteredCacheConfig.locking().isolationLevel(IsolationLevel.REPEATABLE_READ);
        defaultClusteredCacheConfig.clustering().hash().numSegments(1).consistentHashFactory(this.cacheMode.isReplicated() ? new ControlledConsistentHashFactory.Replicated(1) : new ControlledConsistentHashFactory.Default(1, 0));
        createClusteredCaches(4, TestDataSCI.INSTANCE, defaultClusteredCacheConfig);
    }

    private static BackupOwnerInterceptor injectBackupOwnerInterceptor(Cache<?, ?> cache) {
        BackupOwnerInterceptor backupOwnerInterceptor = new BackupOwnerInterceptor();
        TestingUtil.extractInterceptorChain(cache).addInterceptor(backupOwnerInterceptor, 1);
        return backupOwnerInterceptor;
    }

    private void injectReorderResponseRpcManager(Cache<?, ?> cache, Cache<?, ?> cache2) {
        TestingUtil.replaceComponent(cache, (Class<? extends ReorderResponsesRpcManager>) RpcManager.class, new ReorderResponsesRpcManager(address(cache2), (RpcManager) TestingUtil.extractComponent(cache, RpcManager.class)), true);
    }

    private static void assertSameVersion(String str, EntryVersion entryVersion, EntryVersion entryVersion2) {
        AssertJUnit.assertEquals(str, InequalVersionComparisonResult.EQUAL, entryVersion.compareTo(entryVersion2));
    }
}
