/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.scattered.impl;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongArray;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.write.InvalidateVersionsCommand;
import org.infinispan.commons.CacheException;
import org.infinispan.configuration.cache.ClusteringConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.container.DataContainer;
import org.infinispan.container.versioning.EntryVersion;
import org.infinispan.container.versioning.SimpleClusteredVersion;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.factories.ComponentRegistry;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
import org.infinispan.filter.KeyFilter;
import org.infinispan.metadata.InternalMetadata;
import org.infinispan.persistence.manager.OrderedUpdatesManager;
import org.infinispan.persistence.manager.PersistenceManager;
import org.infinispan.remoting.inboundhandler.DeliverOrder;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.rpc.ResponseMode;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.scattered.ScatteredVersionManager;
import org.infinispan.statetransfer.StateConsumer;
import org.infinispan.topology.CacheTopology;
import org.infinispan.topology.ClusterTopologyManager;
import org.infinispan.util.concurrent.CompletableFutures;
import org.infinispan.util.concurrent.WithinThreadExecutor;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class ScatteredVersionManagerImpl<K>
implements ScatteredVersionManager<K> {
    private static final AtomicReferenceFieldUpdater<ScatteredVersionManagerImpl, ConcurrentMap> scheduledKeysSwapper = AtomicReferenceFieldUpdater.newUpdater(ScatteredVersionManagerImpl.class, ConcurrentMap.class, "scheduledKeys");
    private static final AtomicReferenceFieldUpdater<ScatteredVersionManagerImpl, ConcurrentMap> removedKeysSwapper = AtomicReferenceFieldUpdater.newUpdater(ScatteredVersionManagerImpl.class, ConcurrentMap.class, "removedKeys");
    private static final AtomicIntegerFieldUpdater<ScatteredVersionManagerImpl> topologyIdUpdater = AtomicIntegerFieldUpdater.newUpdater(ScatteredVersionManagerImpl.class, "topologyId");
    private static final Log log = LogFactory.getLog(ScatteredVersionManager.class);
    private static final boolean trace = log.isTraceEnabled();
    private Configuration configuration;
    private int invalidationBatchSize;
    private int numSegments;
    private ComponentRegistry componentRegistry;
    private ExecutorService executorService;
    private CommandsFactory commandsFactory;
    private RpcManager rpcManager;
    private DataContainer<K, ?> dataContainer;
    private RpcOptions syncIgnoreLeavers;
    private PersistenceManager persistenceManager;
    private StateConsumer stateConsumer;
    private ClusterTopologyManager clusterTopologyManager;
    private OrderedUpdatesManager orderedUpdatesManager;
    private int preloadedTopologyId = 0;
    private volatile int topologyId = 0;
    private AtomicReferenceArray<ScatteredVersionManager.SegmentState> segmentStates;
    private AtomicReferenceArray<CompletableFuture<Void>> blockedFutures;
    private AtomicLongArray segmentVersions;
    private AtomicIntegerArray ownerTopologyIds;
    private volatile ConcurrentMap<K, InvalidationInfo> scheduledKeys;
    private volatile ConcurrentMap<K, InvalidationInfo> removedKeys;
    private volatile boolean transferringValues = false;
    private volatile int valuesTopology = -1;
    private CompletableFuture<Void> valuesFuture = CompletableFutures.completedNull();
    private final Object valuesLock = new Object();

    @Inject
    public void init(Configuration configuration, ComponentRegistry componentRegistry, @ComponentName(value="org.infinispan.executors.transport") ExecutorService executorService, CommandsFactory commandsFactory, RpcManager rpcManager, DataContainer<K, ?> dataContainer, PersistenceManager persistenceManager, StateConsumer stateConsumer, ClusterTopologyManager clusterTopologyManager, OrderedUpdatesManager orderedUpdatesManager) {
        this.componentRegistry = componentRegistry;
        this.configuration = configuration;
        this.executorService = executorService;
        this.commandsFactory = commandsFactory;
        this.rpcManager = rpcManager;
        this.dataContainer = dataContainer;
        this.persistenceManager = persistenceManager;
        this.stateConsumer = stateConsumer;
        this.clusterTopologyManager = clusterTopologyManager;
        this.orderedUpdatesManager = orderedUpdatesManager;
    }

    @Start(priority=15)
    public void start() {
        this.numSegments = this.configuration.clustering().hash().numSegments();
        this.segmentVersions = new AtomicLongArray(this.numSegments);
        this.segmentStates = new AtomicReferenceArray(this.numSegments);
        this.blockedFutures = new AtomicReferenceArray(this.numSegments);
        this.ownerTopologyIds = new AtomicIntegerArray(this.numSegments);
        CacheTopology cacheTopology = this.stateConsumer.getCacheTopology();
        ConsistentHash consistentHash = cacheTopology == null ? null : cacheTopology.getCurrentCH();
        for (int i = 0; i < this.numSegments; ++i) {
            ScatteredVersionManager.SegmentState state = ScatteredVersionManager.SegmentState.NOT_OWNED;
            if (consistentHash != null && consistentHash.isSegmentLocalToNode(this.rpcManager.getAddress(), i)) {
                state = ScatteredVersionManager.SegmentState.OWNED;
            }
            this.segmentStates.set(i, state);
        }
        this.printTable();
        this.configuration.clustering().attributes().attribute(ClusteringConfiguration.REMOTE_TIMEOUT).addListener((a, o) -> this.initRpcOptions());
        this.initRpcOptions();
        this.scheduledKeys = new ConcurrentHashMap<K, InvalidationInfo>(this.invalidationBatchSize);
        this.invalidationBatchSize = this.configuration.clustering().invalidationBatchSize();
        this.removedKeys = new ConcurrentHashMap<K, InvalidationInfo>(this.invalidationBatchSize);
    }

    private void initRpcOptions() {
        this.syncIgnoreLeavers = this.rpcManager.getRpcOptionsBuilder(ResponseMode.SYNCHRONOUS_IGNORE_LEAVERS, DeliverOrder.NONE).build();
    }

    @Start(priority=57)
    public void initTopologyId() {
        if (this.persistenceManager.isPreloaded()) {
            if (this.preloadedTopologyId > 0) {
                this.clusterTopologyManager.setInitialCacheTopologyId(this.componentRegistry.getCacheName(), this.preloadedTopologyId + 1);
            }
            return;
        }
        AtomicInteger maxTopologyId = new AtomicInteger(this.preloadedTopologyId);
        this.persistenceManager.processOnAllStores(new WithinThreadExecutor(), KeyFilter.ACCEPT_ALL_FILTER, (marshalledEntry, taskContext) -> {
            EntryVersion entryVersion;
            InternalMetadata metadata = marshalledEntry.getMetadata();
            if (metadata != null && (entryVersion = metadata.version()) instanceof SimpleClusteredVersion) {
                int entryTopologyId = ((SimpleClusteredVersion)entryVersion).topologyId;
                if (maxTopologyId.get() < entryTopologyId) {
                    maxTopologyId.updateAndGet(current -> Math.max(current, entryTopologyId));
                }
            }
        }, false, true);
        if (maxTopologyId.get() > 0) {
            this.clusterTopologyManager.setInitialCacheTopologyId(this.componentRegistry.getCacheName(), maxTopologyId.get() + 1);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Stop
    public void stop() {
        log.trace("Stopping " + this + " on " + this.rpcManager.getAddress());
        Object object = this.valuesLock;
        synchronized (object) {
            this.valuesTopology = Integer.MAX_VALUE;
            this.valuesFuture.completeExceptionally(new CacheException("Cache is stopping"));
        }
        log.trace("Stopped " + this + " on " + this.rpcManager.getAddress());
    }

    @Override
    public EntryVersion incrementVersion(int segment) {
        switch (this.segmentStates.get(segment)) {
            case NOT_OWNED: {
                throw new CacheException("Segment " + segment + " is not owned by " + this.rpcManager.getAddress());
            }
            case BLOCKED: {
                throw new CacheException("Segment " + segment + " is currently blocked");
            }
            case KEY_TRANSFER: 
            case VALUE_TRANSFER: 
            case OWNED: {
                return new SimpleClusteredVersion(this.topologyId, this.segmentVersions.addAndGet(segment, 1L));
            }
        }
        throw new IllegalStateException();
    }

    @Override
    public void scheduleKeyInvalidation(K key, EntryVersion version, boolean removal) {
        ConcurrentMap<Object, InvalidationInfo> scheduledKeys;
        do {
            scheduledKeys = this.scheduledKeys;
            InvalidationInfo ii = new InvalidationInfo((SimpleClusteredVersion)version, removal);
            scheduledKeys.compute(key, (k, old) -> old == null ? ii : (ii.version > old.version || ii.removal && ii.version == old.version ? ii : old));
        } while (scheduledKeys != this.scheduledKeys);
        if (scheduledKeys.size() > this.invalidationBatchSize) {
            this.tryRegularInvalidations(scheduledKeys, false);
        }
    }

    protected boolean startFlush() {
        ConcurrentMap<K, InvalidationInfo> scheduledKeys = this.scheduledKeys;
        if (!scheduledKeys.isEmpty()) {
            this.tryRegularInvalidations(scheduledKeys, true);
            return true;
        }
        ConcurrentMap<K, InvalidationInfo> removedKeys = this.removedKeys;
        if (!removedKeys.isEmpty()) {
            this.tryRemovedInvalidations(removedKeys);
            return true;
        }
        return false;
    }

    @Override
    public synchronized void registerSegment(int segment) {
        this.ownerTopologyIds.set(segment, this.topologyId);
        this.segmentVersions.set(segment, 0L);
        this.blockedFutures.set(segment, new CompletableFuture());
        if (!this.segmentStates.compareAndSet(segment, ScatteredVersionManager.SegmentState.NOT_OWNED, ScatteredVersionManager.SegmentState.BLOCKED)) {
            throw new IllegalStateException("Segment " + segment + " is in state " + (Object)((Object)this.segmentStates.get(segment)));
        }
        log.tracef("Node %s blocks access to segment %d", (Object)this.rpcManager.getAddress(), (Object)segment);
    }

    @Override
    public synchronized void unregisterSegment(int segment) {
        this.segmentStates.set(segment, ScatteredVersionManager.SegmentState.NOT_OWNED);
        CompletableFuture<Void> blockedFuture = this.blockedFutures.get(segment);
        if (blockedFuture != null) {
            blockedFuture.completeExceptionally(new CacheException("The segment is no longer owned."));
        }
    }

    @Override
    public boolean isVersionActual(int segment, EntryVersion version) {
        SimpleClusteredVersion clusteredVersion = (SimpleClusteredVersion)version;
        return clusteredVersion.topologyId >= this.ownerTopologyIds.get(segment);
    }

    @Override
    public void notifyKeyTransferFinished(int segment, boolean expectValues) {
        ScatteredVersionManager.SegmentState previous;
        ScatteredVersionManager.SegmentState update = expectValues ? ScatteredVersionManager.SegmentState.VALUE_TRANSFER : ScatteredVersionManager.SegmentState.OWNED;
        do {
            previous = this.segmentStates.get(segment);
            log.tracef("Finishing transfer for segment %d = %s -> %s", segment, (Object)previous, (Object)update);
        } while (!this.segmentStates.compareAndSet(segment, previous, update));
        CompletableFuture<Void> blockedFuture = this.blockedFutures.get(segment);
        if (blockedFuture != null) {
            blockedFuture.completeExceptionally(new CacheException("Segment state transition did not complete correctly."));
        }
        if (trace) {
            if (expectValues) {
                log.tracef("Node %s, segment %d has all keys in, expects value transfer", (Object)this.rpcManager.getAddress(), (Object)segment);
            } else {
                log.tracef("Node %s, segment %d did not transfer any keys, segment is owned now", (Object)this.rpcManager.getAddress(), (Object)segment);
            }
        }
    }

    @Override
    public ScatteredVersionManager.SegmentState getSegmentState(int segment) {
        return this.segmentStates.get(segment);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setValuesTransferTopology(int topologyId) {
        log.tracef("Node will transfer value for topology %d", topologyId);
        Object object = this.valuesLock;
        synchronized (object) {
            this.transferringValues = true;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void notifyValueTransferFinished() {
        block8: for (int i = 0; i < this.numSegments; ++i) {
            block9: while (true) {
                ScatteredVersionManager.SegmentState state = this.segmentStates.get(i);
                switch (state) {
                    case NOT_OWNED: 
                    case OWNED: {
                        continue block8;
                    }
                    case BLOCKED: 
                    case KEY_TRANSFER: {
                        this.blockedFutures.get(i).completeExceptionally(new CacheException("Failed to request versions"));
                        log.warnf("Stopped applying state for segment %d in topology %d but the segment is in state %s", (Object)i, (Object)this.topologyId, (Object)state);
                    }
                    case VALUE_TRANSFER: {
                        if (this.segmentStates.compareAndSet(i, state, ScatteredVersionManager.SegmentState.OWNED)) continue block8;
                    }
                    default: {
                        continue block9;
                    }
                }
                break;
            }
        }
        Object object = this.valuesLock;
        synchronized (object) {
            this.valuesTopology = Math.max(this.topologyId, this.valuesTopology);
            this.transferringValues = false;
            this.valuesFuture.complete(null);
            this.valuesFuture = new CompletableFuture();
        }
        log.debugf("Node %s received values for all segments in topology %d", (Object)this.rpcManager.getAddress(), (Object)this.topologyId);
    }

    @Override
    public CompletableFuture<Void> getBlockingFuture(int segment) {
        return this.blockedFutures.get(segment);
    }

    @Override
    public void setTopologyId(int topologyId) {
        int currentTopologyId = this.topologyId;
        if (currentTopologyId >= topologyId) {
            throw new IllegalArgumentException("Updating to topology " + topologyId + " but current is " + currentTopologyId);
        }
        if (!topologyIdUpdater.compareAndSet(this, currentTopologyId, topologyId)) {
            throw new IllegalStateException("Concurrent update to topology " + topologyId + ", current was " + currentTopologyId + " but now it's " + this.topologyId);
        }
    }

    @Override
    public void updatePreloadedEntryVersion(EntryVersion version) {
        if (version instanceof SimpleClusteredVersion) {
            int topologyId = ((SimpleClusteredVersion)version).topologyId;
            this.preloadedTopologyId = Math.max(this.preloadedTopologyId, topologyId);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> valuesFuture(int topologyId) {
        if (this.transferringValues && topologyId < this.valuesTopology) {
            Object object = this.valuesLock;
            synchronized (object) {
                if (this.transferringValues && topologyId > this.valuesTopology) {
                    return this.valuesFuture.thenCompose(nil -> this.valuesFuture(topologyId));
                }
            }
        }
        return CompletableFutures.completedNull();
    }

    @Override
    public void setOwnedSegments(Set<Integer> segments) {
        for (int segment : segments) {
            this.segmentVersions.set(segment, 0L);
            this.ownerTopologyIds.set(segment, this.topologyId);
            if (this.segmentStates.compareAndSet(segment, ScatteredVersionManager.SegmentState.NOT_OWNED, ScatteredVersionManager.SegmentState.OWNED)) continue;
            throw new IllegalStateException("Segment %d is in state " + (Object)((Object)this.segmentStates.get(segment)));
        }
        if (log.isDebugEnabled()) {
            log.debugf("Node %s is now owner of segments %s", (Object)this.rpcManager.getAddress(), (Object)this.sorted(segments));
            this.printTable();
        }
    }

    @Override
    public void startKeyTransfer(Set<Integer> segments) {
        for (int segment : segments) {
            if (!this.segmentStates.compareAndSet(segment, ScatteredVersionManager.SegmentState.BLOCKED, ScatteredVersionManager.SegmentState.KEY_TRANSFER)) {
                throw new IllegalStateException("Segment " + segment + " is in state " + (Object)((Object)this.segmentStates.get(segment)));
            }
            this.blockedFutures.get(segment).complete(null);
            log.tracef("Node %s, segment %d expects key transfer", (Object)this.rpcManager.getAddress(), (Object)segment);
        }
    }

    private void printTable() {
        StringBuilder sb = new StringBuilder("Segments for node ").append(this.rpcManager.getAddress()).append(':');
        for (int i = 0; i < this.numSegments; i += 16) {
            sb.append('\n');
            for (int j = 0; j < 16 && i + j < this.numSegments; ++j) {
                sb.append(String.format("%4d=%c ", i + j, Character.valueOf(this.segmentStates.get(i + j).singleChar())));
            }
        }
        log.debug(sb.toString());
    }

    private List<Integer> sorted(Set<Integer> segments) {
        Object[] array = segments.toArray(new Integer[segments.size()]);
        Arrays.sort(array);
        return Arrays.asList(array);
    }

    private void tryRegularInvalidations(ConcurrentMap<K, InvalidationInfo> scheduledKeys, boolean force) {
        if (!scheduledKeysSwapper.compareAndSet(this, scheduledKeys, new ConcurrentHashMap(this.invalidationBatchSize))) {
            return;
        }
        this.executorService.execute(() -> {
            int numKeys = scheduledKeys.size();
            Object[] keys = new Object[numKeys];
            int[] topologyIds = new int[numKeys];
            long[] versions = new long[numKeys];
            boolean[] isRemoved = new boolean[numKeys];
            int numRemoved = 0;
            int i = 0;
            for (Map.Entry entry : scheduledKeys.entrySet()) {
                keys[i] = entry.getKey();
                topologyIds[i] = ((InvalidationInfo)entry.getValue()).topologyId;
                versions[i] = ((InvalidationInfo)entry.getValue()).version;
                isRemoved[i] = ((InvalidationInfo)entry.getValue()).removal;
                if (isRemoved[i]) {
                    ++numRemoved;
                }
                if (++i <= numKeys) continue;
                numKeys = scheduledKeys.size();
                keys = Arrays.copyOf(keys, numKeys);
                topologyIds = Arrays.copyOf(topologyIds, numKeys);
                versions = Arrays.copyOf(versions, numKeys);
                isRemoved = Arrays.copyOf(isRemoved, numKeys);
            }
            InvalidateVersionsCommand command = this.commandsFactory.buildInvalidateVersionsCommand(keys, topologyIds, versions, false);
            this.sendRegularInvalidations(command, keys, topologyIds, versions, numRemoved, isRemoved, force);
        });
    }

    private void sendRegularInvalidations(InvalidateVersionsCommand command, Object[] keys, int[] topologyIds, long[] versions, int numRemoved, boolean[] isRemoved, boolean force) {
        CompletableFuture<Map<Address, Response>> future = this.rpcManager.invokeRemotelyAsync(null, command, this.syncIgnoreLeavers);
        future.whenComplete((r, t) -> {
            if (t != null) {
                log.failedInvalidatingRemoteCache((Throwable)t);
                this.sendRegularInvalidations(command, keys, topologyIds, versions, numRemoved, isRemoved, force);
            } else if (numRemoved > 0 || force) {
                this.regularInvalidationFinished(keys, topologyIds, versions, isRemoved, force);
            }
        });
    }

    protected void regularInvalidationFinished(Object[] keys, int[] topologyIds, long[] versions, boolean[] isRemoved, boolean force) {
        ConcurrentMap<Object, InvalidationInfo> removedKeys;
        do {
            removedKeys = this.removedKeys;
            for (int i = 0; i < isRemoved.length; ++i) {
                if (!isRemoved[i]) continue;
                int topologyId = topologyIds[i];
                long version = versions[i];
                removedKeys.compute(keys[i], (k, prev) -> {
                    if (prev == null || prev.topologyId < topologyId || prev.topologyId == topologyId && prev.version < version) {
                        return new InvalidationInfo(topologyId, version);
                    }
                    return prev;
                });
            }
        } while (removedKeys != this.removedKeys);
        if (removedKeys.size() > this.invalidationBatchSize || force && !removedKeys.isEmpty()) {
            this.tryRemovedInvalidations(removedKeys);
        }
    }

    private void tryRemovedInvalidations(ConcurrentMap<K, InvalidationInfo> removedKeys) {
        if (removedKeysSwapper.compareAndSet(this, removedKeys, new ConcurrentHashMap(this.invalidationBatchSize))) {
            ConcurrentMap<K, InvalidationInfo> rk = removedKeys;
            this.executorService.execute(() -> {
                int numKeys = rk.size();
                Object[] keys = new Object[numKeys];
                int[] topologyIds = new int[numKeys];
                long[] versions = new long[numKeys];
                int i = 0;
                for (Map.Entry entry : rk.entrySet()) {
                    keys[i] = entry.getKey();
                    topologyIds[i] = ((InvalidationInfo)entry.getValue()).topologyId;
                    versions[i] = ((InvalidationInfo)entry.getValue()).version;
                    if (++i <= numKeys) continue;
                    numKeys = rk.size();
                    keys = Arrays.copyOf(keys, numKeys);
                    topologyIds = Arrays.copyOf(topologyIds, numKeys);
                    versions = Arrays.copyOf(versions, numKeys);
                }
                InvalidateVersionsCommand removeCommand = this.commandsFactory.buildInvalidateVersionsCommand(keys, topologyIds, versions, true);
                this.sendRemoveInvalidations(removeCommand);
            });
        }
    }

    private void sendRemoveInvalidations(InvalidateVersionsCommand removeCommand) {
        this.rpcManager.invokeRemotelyAsync(null, removeCommand, this.syncIgnoreLeavers).whenComplete((r, t) -> {
            if (t != null) {
                log.failedInvalidatingRemoteCache((Throwable)t);
                this.sendRemoveInvalidations(removeCommand);
            } else {
                this.removeInvalidationsFinished();
            }
        });
        removeCommand.init(this.dataContainer, this.orderedUpdatesManager);
        removeCommand.invokeAsync();
    }

    protected void removeInvalidationsFinished() {
    }

    @Override
    public void clearInvalidations() {
        scheduledKeysSwapper.set(this, new ConcurrentHashMap(this.invalidationBatchSize));
        removedKeysSwapper.set(this, new ConcurrentHashMap(this.invalidationBatchSize));
    }

    private static class InvalidationInfo {
        public final int topologyId;
        public final long version;
        public final boolean removal;

        private InvalidationInfo(SimpleClusteredVersion version, boolean removal) {
            this.topologyId = version.topologyId;
            this.version = version.version;
            this.removal = removal;
        }

        private InvalidationInfo(int topologyId, long version) {
            this.topologyId = topologyId;
            this.version = version;
            this.removal = true;
        }
    }
}

