/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.interceptors.distribution;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.infinispan.commands.CommandInvocationId;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.FlagAffectedCommand;
import org.infinispan.commands.TopologyAffectedCommand;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.read.GetCacheEntryCommand;
import org.infinispan.commands.write.AbstractDataWriteCommand;
import org.infinispan.commands.write.BackupPutMapRcpCommand;
import org.infinispan.commands.write.BackupWriteRcpCommand;
import org.infinispan.commands.write.DataWriteCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.PutMapCommand;
import org.infinispan.commands.write.RemoveCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.impl.FlagBitSets;
import org.infinispan.distribution.DistributionInfo;
import org.infinispan.distribution.TriangleOrderManager;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.interceptors.InvocationExceptionFunction;
import org.infinispan.interceptors.InvocationFinallyAction;
import org.infinispan.interceptors.distribution.NonTxDistributionInterceptor;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.transport.Address;
import org.infinispan.topology.CacheTopology;
import org.infinispan.util.concurrent.CommandAckCollector;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class TriangleDistributionInterceptor
extends NonTxDistributionInterceptor {
    private static final Log log = LogFactory.getLog(TriangleDistributionInterceptor.class);
    private static final boolean trace = log.isTraceEnabled();
    private CommandAckCollector commandAckCollector;
    private CommandsFactory commandsFactory;
    private TriangleOrderManager triangleOrderManager;
    private Address localAddress;
    private final InvocationFinallyAction onPutMapWithLocalEntries = this::afterPutMapCommand;
    private final InvocationExceptionFunction onPutMapNoLocalEntriesException = this::onPutMapNoLocalEntriesException;

    @Inject
    public void inject(CommandAckCollector commandAckCollector, CommandsFactory commandsFactory, TriangleOrderManager triangleOrderManager) {
        this.commandAckCollector = commandAckCollector;
        this.commandsFactory = commandsFactory;
        this.triangleOrderManager = triangleOrderManager;
    }

    @Start
    public void start() {
        this.localAddress = this.rpcManager.getAddress();
    }

    @Override
    public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable {
        return this.handleDataWriteCommand(ctx, command);
    }

    @Override
    public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable {
        if (ctx.isOriginLocal()) {
            return this.handleLocalPutMapCommand(ctx, command);
        }
        return this.handleRemotePutMapCommand(ctx, command);
    }

    @Override
    public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable {
        return this.handleDataWriteCommand(ctx, command);
    }

    @Override
    public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable {
        return this.handleDataWriteCommand(ctx, command);
    }

    private Object handleRemotePutMapCommand(InvocationContext ctx, PutMapCommand command) {
        CacheTopology cacheTopology = this.checkTopologyId(command);
        ConsistentHash ch = cacheTopology.getWriteConsistentHash();
        VisitableCommand.LoadType loadType = command.loadType();
        if (command.isForwarded() || ch.getNumOwners() == 1) {
            return this.asyncInvokeNext(ctx, command, this.checkRemoteGetIfNeeded(ctx, command, command.getMap().keySet(), ch, loadType == VisitableCommand.LoadType.OWNER));
        }
        this.sendToBackups(command, command.getMap(), ch);
        return this.asyncInvokeNext(ctx, command, this.checkRemoteGetIfNeeded(ctx, command, command.getMap().keySet(), ch, loadType == VisitableCommand.LoadType.OWNER));
    }

    private void sendToBackups(PutMapCommand command, Map<Object, Object> entries, ConsistentHash ch) {
        BackupOwnerClassifier filter = new BackupOwnerClassifier(ch, entries.size());
        entries.entrySet().forEach(filter::add);
        int topologyId = command.getTopologyId();
        for (Map.Entry entry : filter.perSegmentKeyValue.entrySet()) {
            int segmentId = (Integer)entry.getKey();
            List<Address> owners = ch.locateOwnersForSegment(segmentId);
            int size = owners.size();
            if (size == 1) continue;
            Map map = (Map)entry.getValue();
            long sequence = this.triangleOrderManager.next(segmentId, topologyId);
            BackupPutMapRcpCommand backupPutMapRcpCommand = this.commandsFactory.buildBackupPutMapRcpCommand(command);
            backupPutMapRcpCommand.setMap(map);
            backupPutMapRcpCommand.setSequence(sequence);
            if (trace) {
                this.logCommandSequence(command.getCommandInvocationId(), segmentId, sequence);
            }
            this.rpcManager.sendToMany(owners.subList(1, size), backupPutMapRcpCommand, DeliverOrder.NONE);
        }
    }

    private Object handleLocalPutMapCommand(InvocationContext ctx, PutMapCommand command) {
        CacheTopology cacheTopology = this.checkTopologyId(command);
        ConsistentHash consistentHash = cacheTopology.getWriteConsistentHash();
        PrimaryOwnerClassifier filter = new PrimaryOwnerClassifier(consistentHash, command.getMap().size());
        boolean sync = this.isSynchronous(command);
        VisitableCommand.LoadType loadType = command.loadType();
        command.getMap().entrySet().forEach(filter::add);
        if (sync) {
            this.commandAckCollector.createMultiKeyCollector(command.getCommandInvocationId(), filter.primaries.keySet(), filter.backups, command.getTopologyId());
            Map localEntries = (Map)filter.primaries.remove(this.localAddress);
            this.forwardToPrimaryOwners(command, filter);
            if (localEntries != null) {
                this.sendToBackups(command, localEntries, consistentHash);
                CompletableFuture<?> remoteGet = this.checkRemoteGetIfNeeded(ctx, command, localEntries.keySet(), consistentHash, loadType == VisitableCommand.LoadType.PRIMARY || loadType == VisitableCommand.LoadType.OWNER);
                return TriangleDistributionInterceptor.makeStage(this.asyncInvokeNext(ctx, command, remoteGet)).andFinally(ctx, command, this.onPutMapWithLocalEntries);
            }
            return this.invokeNextAndExceptionally(ctx, command, this.onPutMapNoLocalEntriesException);
        }
        Map localEntries = (Map)filter.primaries.remove(this.localAddress);
        this.forwardToPrimaryOwners(command, filter);
        if (localEntries != null) {
            this.sendToBackups(command, localEntries, consistentHash);
            return this.asyncInvokeNext(ctx, command, this.checkRemoteGetIfNeeded(ctx, command, localEntries.keySet(), consistentHash, loadType == VisitableCommand.LoadType.PRIMARY || loadType == VisitableCommand.LoadType.OWNER));
        }
        return this.invokeNext(ctx, command);
    }

    private void afterPutMapCommand(InvocationContext ctx, VisitableCommand rCommand, Object rv, Throwable t) {
        PutMapCommand cmd = (PutMapCommand)rCommand;
        if (t != null) {
            this.commandAckCollector.completeExceptionally(cmd.getCommandInvocationId(), t, cmd.getTopologyId());
        } else {
            this.commandAckCollector.multiKeyPrimaryAck(cmd.getCommandInvocationId(), this.localAddress, (Map)rv, cmd.getTopologyId());
        }
    }

    private Object onPutMapNoLocalEntriesException(InvocationContext ctx, VisitableCommand rCommand, Throwable t) throws Throwable {
        PutMapCommand cmd = (PutMapCommand)rCommand;
        this.commandAckCollector.completeExceptionally(cmd.getCommandInvocationId(), t, cmd.getTopologyId());
        assert (t != null);
        throw t;
    }

    private <C extends FlagAffectedCommand & TopologyAffectedCommand> CompletableFuture<?> checkRemoteGetIfNeeded(InvocationContext ctx, C command, Set<Object> keys, ConsistentHash consistentHash, boolean needsPreviousValue) {
        if (!needsPreviousValue) {
            for (Object key : keys) {
                CacheEntry cacheEntry = ctx.lookupEntry(key);
                if (cacheEntry != null || !consistentHash.isKeyLocalToNode(this.localAddress, key)) continue;
                this.entryFactory.wrapExternalEntry(ctx, key, null, true);
            }
            return CompletableFutures.completedNull();
        }
        ArrayList futureList = new ArrayList(keys.size());
        for (Object key : keys) {
            CacheEntry cacheEntry = ctx.lookupEntry(key);
            if (cacheEntry != null || !consistentHash.isKeyLocalToNode(this.localAddress, key)) continue;
            this.wrapKeyExternally(ctx, command, key, futureList);
        }
        int size = futureList.size();
        if (size == 0) {
            return CompletableFutures.completedNull();
        }
        CompletableFuture[] array = new CompletableFuture[size];
        futureList.toArray(array);
        return CompletableFuture.allOf(array);
    }

    private <C extends FlagAffectedCommand & TopologyAffectedCommand> void wrapKeyExternally(InvocationContext ctx, C command, Object key, List<CompletableFuture<?>> futureList) {
        if (command.hasAnyFlag(FlagBitSets.SKIP_REMOTE_LOOKUP | FlagBitSets.CACHE_MODE_LOCAL)) {
            this.entryFactory.wrapExternalEntry(ctx, key, null, true);
        } else {
            GetCacheEntryCommand fakeGetCommand = this.cf.buildGetCacheEntryCommand(key, command.getFlagsBitSet());
            fakeGetCommand.setTopologyId(((TopologyAffectedCommand)command).getTopologyId());
            futureList.add(this.remoteGet(ctx, fakeGetCommand, key, true));
        }
    }

    private void forwardToPrimaryOwners(PutMapCommand command, PrimaryOwnerClassifier splitter) {
        for (Map.Entry entry : splitter.primaries.entrySet()) {
            PutMapCommand copy = new PutMapCommand(command, false);
            copy.setMap((Map)entry.getValue());
            this.rpcManager.sendTo((Address)entry.getKey(), copy, DeliverOrder.NONE);
        }
    }

    private Object handleDataWriteCommand(InvocationContext context, AbstractDataWriteCommand command) {
        assert (!context.isInTxScope());
        if (command.hasAnyFlag(FlagBitSets.CACHE_MODE_LOCAL)) {
            return this.invokeNext(context, command);
        }
        CacheTopology topology = this.checkTopologyId(command);
        DistributionInfo distributionInfo = new DistributionInfo(command.getKey(), topology.getWriteConsistentHash(), this.localAddress);
        switch (distributionInfo.ownership()) {
            case PRIMARY: {
                assert (context.lookupEntry(command.getKey()) != null);
                return this.primaryOwnerWrite(context, command, distributionInfo);
            }
            case BACKUP: {
                if (context.isOriginLocal()) {
                    return this.localWriteInvocation(context, command, distributionInfo);
                }
                CacheEntry entry = context.lookupEntry(command.getKey());
                if (entry == null) {
                    if (command.loadType() == VisitableCommand.LoadType.OWNER) {
                        return this.asyncInvokeNext(context, command, this.remoteGet(context, command, command.getKey(), true));
                    }
                    this.entryFactory.wrapExternalEntry(context, command.getKey(), null, true);
                }
                return this.invokeNext(context, command);
            }
            case NON_OWNER: {
                assert (context.isOriginLocal());
                return this.localWriteInvocation(context, command, distributionInfo);
            }
        }
        throw new IllegalStateException();
    }

    private Object primaryOwnerWrite(InvocationContext context, DataWriteCommand command, DistributionInfo distributionInfo) {
        if (command.hasAnyFlag(FlagBitSets.COMMAND_RETRY)) {
            command.setValueMatcher(command.getValueMatcher().matcherForRetry());
        }
        return this.invokeNextThenAccept(context, command, (rCtx, rCommand, rv) -> {
            DataWriteCommand dwCommand = (DataWriteCommand)rCommand;
            CommandInvocationId id = dwCommand.getCommandInvocationId();
            if (!dwCommand.isSuccessful()) {
                if (trace) {
                    log.tracef("Command %s not successful in primary owner.", (Object)id);
                }
                return;
            }
            if (distributionInfo.owners().size() > 1) {
                Collection<Address> backupOwners = distributionInfo.backups();
                if (rCtx.isOriginLocal() && (this.isSynchronous(dwCommand) || dwCommand.isReturnValueExpected())) {
                    this.commandAckCollector.create(id, rv, distributionInfo.owners(), dwCommand.getTopologyId());
                    this.checkTopologyId(dwCommand);
                }
                if (trace) {
                    log.tracef("Command %s send to backup owner %s.", (Object)dwCommand.getCommandInvocationId(), (Object)backupOwners);
                }
                long sequenceNumber = this.triangleOrderManager.next(distributionInfo.getSegmentId(), dwCommand.getTopologyId());
                BackupWriteRcpCommand backupWriteRcpCommand = this.commandsFactory.buildBackupWriteRcpCommand(dwCommand);
                backupWriteRcpCommand.setSequence(sequenceNumber);
                if (trace) {
                    this.logCommandSequence(id, distributionInfo.getSegmentId(), sequenceNumber);
                }
                this.rpcManager.sendToMany(backupOwners, backupWriteRcpCommand, DeliverOrder.NONE);
            }
        });
    }

    private void logCommandSequence(CommandInvocationId id, int segment, long sequence) {
        log.tracef("Command %s got sequence %s for segment %s", (Object)id, (Object)sequence, (Object)segment);
    }

    private Object localWriteInvocation(InvocationContext context, DataWriteCommand command, DistributionInfo distributionInfo) {
        assert (context.isOriginLocal());
        CommandInvocationId invocationId = command.getCommandInvocationId();
        if ((this.isSynchronous(command) || command.isReturnValueExpected()) && !command.hasAnyFlag(FlagBitSets.PUT_FOR_EXTERNAL_READ)) {
            this.commandAckCollector.create(invocationId, distributionInfo.owners(), command.getTopologyId());
        }
        if (command.hasAnyFlag(FlagBitSets.COMMAND_RETRY)) {
            command.setValueMatcher(command.getValueMatcher().matcherForRetry());
        }
        this.rpcManager.sendTo(distributionInfo.primary(), command, DeliverOrder.NONE);
        return null;
    }

    private static class BackupOwnerClassifier {
        private final Map<Integer, Map<Object, Object>> perSegmentKeyValue;
        private final ConsistentHash consistentHash;
        private final int entryCount;

        private BackupOwnerClassifier(ConsistentHash consistentHash, int entryCount) {
            this.consistentHash = consistentHash;
            this.perSegmentKeyValue = new HashMap<Integer, Map<Object, Object>>(consistentHash.getNumSegments());
            this.entryCount = entryCount;
        }

        public void add(Map.Entry<Object, Object> entry) {
            this.perSegmentKeyValue.computeIfAbsent(this.consistentHash.getSegment(entry.getKey()), address -> new HashMap(this.entryCount)).put(entry.getKey(), entry.getValue());
        }
    }

    private static class PrimaryOwnerClassifier {
        private final Map<Address, Collection<Integer>> backups;
        private final Map<Address, Map<Object, Object>> primaries;
        private final ConsistentHash consistentHash;
        private final int entryCount;

        private PrimaryOwnerClassifier(ConsistentHash consistentHash, int entryCount) {
            this.consistentHash = consistentHash;
            int memberSize = consistentHash.getMembers().size();
            this.backups = new HashMap<Address, Collection<Integer>>(memberSize);
            this.primaries = new HashMap<Address, Map<Object, Object>>(memberSize);
            this.entryCount = entryCount;
        }

        public void add(Map.Entry<Object, Object> entry) {
            int segment = this.consistentHash.getSegment(entry.getKey());
            Iterator<Address> iterator = this.consistentHash.locateOwnersForSegment(segment).iterator();
            Address primaryOwner = iterator.next();
            this.primaries.computeIfAbsent(primaryOwner, address -> new HashMap(this.entryCount)).put(entry.getKey(), entry.getValue());
            while (iterator.hasNext()) {
                Address backup = iterator.next();
                this.backups.computeIfAbsent(backup, address -> new HashSet(this.entryCount)).add(segment);
            }
        }
    }
}

