/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.persistence.async;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import net.jcip.annotations.GuardedBy;
import org.infinispan.Cache;
import org.infinispan.commons.CacheException;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.configuration.cache.AsyncStoreConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.factories.threads.DefaultThreadFactory;
import org.infinispan.marshall.core.MarshalledEntry;
import org.infinispan.persistence.async.BufferLock;
import org.infinispan.persistence.async.State;
import org.infinispan.persistence.modifications.Modification;
import org.infinispan.persistence.modifications.Remove;
import org.infinispan.persistence.modifications.Store;
import org.infinispan.persistence.spi.CacheWriter;
import org.infinispan.persistence.spi.InitializationContext;
import org.infinispan.persistence.spi.PersistenceException;
import org.infinispan.persistence.support.DelegatingCacheWriter;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;

public class AsyncCacheWriter
extends DelegatingCacheWriter {
    private static final Log log = LogFactory.getLog(AsyncCacheWriter.class);
    private static final boolean trace = log.isTraceEnabled();
    private ExecutorService executor;
    private Thread coordinator;
    private int concurrencyLevel;
    private String cacheName;
    private String nodeName;
    protected BufferLock stateLock;
    @GuardedBy(value="stateLock")
    protected final AtomicReference<State> state = new AtomicReference();
    @GuardedBy(value="stateLock")
    private boolean stopped;
    protected AsyncStoreConfiguration asyncConfiguration;

    public AsyncCacheWriter(CacheWriter delegate) {
        super(delegate);
    }

    @Override
    public void init(InitializationContext ctx) {
        super.init(ctx);
        this.asyncConfiguration = ctx.getConfiguration().async();
        Cache cache = ctx.getCache();
        Configuration cacheCfg = cache != null ? cache.getCacheConfiguration() : null;
        this.concurrencyLevel = cacheCfg != null ? cacheCfg.locking().concurrencyLevel() : 16;
        this.cacheName = cache != null ? cache.getName() : null;
        this.nodeName = cache != null ? cache.getCacheManager().getCacheManagerConfiguration().transport().nodeName() : null;
    }

    @Override
    public void start() {
        log.debugf("Async cache loader starting %s", (Object)this);
        this.state.set(this.newState(false, null));
        this.stopped = false;
        this.stateLock = new BufferLock(this.asyncConfiguration.modificationQueueSize());
        int poolSize = this.asyncConfiguration.threadPoolSize();
        DefaultThreadFactory processorThreadFactory = new DefaultThreadFactory(null, 5, "%c-%n-p%f-t%t", this.nodeName, "AsyncStoreProcessor");
        this.executor = new ThreadPoolExecutor(poolSize, poolSize, 120L, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), processorThreadFactory);
        ((ThreadPoolExecutor)this.executor).allowCoreThreadTimeOut(true);
        DefaultThreadFactory coordinatorThreadFactory = new DefaultThreadFactory(null, 5, "%c-%n-p%f-t%t", this.nodeName, "AsyncStoreCoordinator");
        this.coordinator = coordinatorThreadFactory.newThread(new AsyncStoreCoordinator());
        this.coordinator.start();
    }

    @Override
    public void stop() {
        if (trace) {
            log.tracef("Stop async store %s", (Object)this);
        }
        this.stateLock.writeLock(0);
        this.stopped = true;
        this.stateLock.writeUnlock();
        try {
            this.coordinator.join();
            this.executor.shutdown();
            if (!this.executor.awaitTermination(1L, TimeUnit.SECONDS)) {
                log.errorAsyncStoreNotStopped();
            }
        }
        catch (InterruptedException e) {
            log.interruptedWaitingAsyncStorePush(e);
            Thread.currentThread().interrupt();
        }
    }

    @Override
    public void write(MarshalledEntry entry) {
        this.put(new Store(entry.getKey(), entry), 1);
    }

    @Override
    public boolean delete(Object key) {
        this.put(new Remove(key), 1);
        return true;
    }

    protected void applyModificationsSync(List<Modification> mods) throws PersistenceException {
        block4: for (Modification m : mods) {
            switch (m.getType()) {
                case STORE: {
                    this.actual.write(((Store)m).getStoredValue());
                    continue block4;
                }
                case REMOVE: {
                    this.actual.delete(((Remove)m).getKey());
                    continue block4;
                }
            }
            throw new IllegalArgumentException("Unknown modification type " + (Object)((Object)m.getType()));
        }
    }

    protected State newState(boolean clear, State next) {
        ConcurrentMap<Object, Modification> map = CollectionFactory.makeConcurrentMap(64, this.concurrencyLevel);
        return new State(clear, map, next);
    }

    void assertNotStopped() throws CacheException {
        if (this.stopped) {
            throw new CacheException("AsyncCacheWriter stopped; no longer accepting more entries.");
        }
    }

    private void put(Modification mod, int count) {
        this.stateLock.writeLock(count);
        try {
            if (trace) {
                log.tracef("Queue modification: %s", (Object)mod);
            }
            this.assertNotStopped();
            this.state.get().put(mod);
        }
        finally {
            this.stateLock.writeUnlock();
        }
    }

    public AtomicReference<State> getState() {
        return this.state;
    }

    protected void clearStore() {
    }

    private class AsyncStoreProcessor
    implements Runnable {
        private final List<Modification> modifications;
        private final State myState;

        AsyncStoreProcessor(List<Modification> modifications, State myState) {
            this.modifications = modifications;
            this.myState = myState;
        }

        @Override
        public void run() {
            block7: {
                State s;
                try {
                    this.retryWork(3);
                    this.myState.workerThreads.countDown();
                    if (this.myState.workerThreads.getCount() != 0L || this.myState.next != null) break block7;
                    s = AsyncCacheWriter.this.state.get();
                }
                catch (Throwable throwable) {
                    this.myState.workerThreads.countDown();
                    if (this.myState.workerThreads.getCount() == 0L && this.myState.next == null) {
                        State s2 = AsyncCacheWriter.this.state.get();
                        while (s2 != null) {
                            if (s2.next == this.myState) {
                                s2.next = null;
                            }
                            s2 = s2.next;
                        }
                    }
                    throw throwable;
                }
                while (s != null) {
                    if (s.next == this.myState) {
                        s.next = null;
                    }
                    s = s.next;
                }
            }
        }

        private void retryWork(int maxRetries) {
            for (int attempt = 0; attempt < maxRetries; ++attempt) {
                if (attempt > 0 && log.isDebugEnabled()) {
                    log.debugf("Retrying due to previous failure. %s attempts left.", maxRetries - attempt);
                }
                try {
                    AsyncCacheWriter.this.applyModificationsSync(this.modifications);
                    return;
                }
                catch (Exception e) {
                    if (!log.isDebugEnabled()) continue;
                    log.debug("Failed to process async modifications", e);
                    continue;
                }
            }
            log.unableToProcessAsyncModifications(maxRetries);
        }
    }

    private class AsyncStoreCoordinator
    implements Runnable {
        private AsyncStoreCoordinator() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            LogFactory.pushNDC(AsyncCacheWriter.this.cacheName, trace);
            try {
                while (true) {
                    State tail;
                    boolean shouldStop;
                    State s;
                    AsyncCacheWriter.this.stateLock.readLock();
                    try {
                        s = AsyncCacheWriter.this.state.get();
                        shouldStop = AsyncCacheWriter.this.stopped;
                        tail = s.next;
                        assert (tail == null || tail.next == null) : "State chain longer than 3 entries!";
                        State head = AsyncCacheWriter.this.newState(false, s);
                        AsyncCacheWriter.this.state.set(head);
                    }
                    finally {
                        AsyncCacheWriter.this.stateLock.reset(0);
                        AsyncCacheWriter.this.stateLock.readUnlock();
                    }
                    try {
                        if (s.clear) {
                            if (tail != null) {
                                tail.workerThreads.await();
                            }
                            AsyncCacheWriter.this.clearStore();
                        }
                        ArrayList<Modification> mods = new ArrayList<Modification>(s.modifications.size());
                        ArrayList<Modification> deferredMods = new ArrayList<Modification>();
                        if (tail != null && tail.workerThreads.getCount() > 0L) {
                            for (Map.Entry e : s.modifications.entrySet()) {
                                if (!tail.modifications.containsKey(e.getKey())) {
                                    mods.add((Modification)e.getValue());
                                    continue;
                                }
                                deferredMods.add((Modification)e.getValue());
                            }
                        } else {
                            mods.addAll(s.modifications.values());
                        }
                        List<AsyncStoreProcessor> procs = this.createProcessors(s, mods);
                        List<AsyncStoreProcessor> deferredProcs = this.createProcessors(s, deferredMods);
                        s.workerThreads = new CountDownLatch(procs.size() + deferredProcs.size());
                        for (AsyncStoreProcessor processor : procs) {
                            AsyncCacheWriter.this.executor.execute(processor);
                        }
                        if (tail != null) {
                            tail.workerThreads.await();
                            s.next = null;
                        }
                        for (AsyncStoreProcessor processor : deferredProcs) {
                            AsyncCacheWriter.this.executor.execute(processor);
                        }
                        if (shouldStop) {
                            s.workerThreads.await();
                            return;
                        }
                    }
                    catch (Exception e) {
                        log.unexpectedErrorInAsyncStoreCoordinator(e);
                    }
                }
            }
            finally {
                LogFactory.popNDC(trace);
            }
        }

        private List<AsyncStoreProcessor> createProcessors(State state, List<Modification> mods) {
            ArrayList<AsyncStoreProcessor> result = new ArrayList<AsyncStoreProcessor>();
            int threads = Math.min(mods.size(), AsyncCacheWriter.this.asyncConfiguration.threadPoolSize());
            if (threads > 0) {
                int start = 0;
                int quotient = mods.size() / threads;
                int remainder = mods.size() % threads;
                for (int i = 0; i < threads; ++i) {
                    int end = start + quotient + (i < remainder ? 1 : 0);
                    result.add(new AsyncStoreProcessor(mods.subList(start, end), state));
                    start = end;
                }
                assert (start == mods.size()) : "Thread distribution is broken!";
            }
            return result;
        }
    }
}

