/*
 * Decompiled with CFR 0.152.
 */
package org.unidal.concurrent;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.unidal.concurrent.ActorContext;

public abstract class AbstractActorContext<E>
implements ActorContext<E> {
    private BlockingQueue<E> m_queue;
    private AtomicLong m_added = new AtomicLong();
    private AtomicLong m_overflowed = new AtomicLong();
    private AtomicInteger m_processed = new AtomicInteger();
    private AtomicLong m_lastAccess;

    public AbstractActorContext() {
        this.m_queue = new ArrayBlockingQueue(this.getInitialQueueSize());
        this.m_lastAccess = new AtomicLong(System.currentTimeMillis());
    }

    @Override
    public boolean addLast(E event) throws InterruptedException {
        if (this.isBlocking()) {
            if (this.m_queue.offer(event, 5L, TimeUnit.MILLISECONDS)) {
                this.m_added.incrementAndGet();
                return true;
            }
            return false;
        }
        if (this.m_queue.offer(event)) {
            this.m_added.incrementAndGet();
        } else {
            this.m_overflowed.incrementAndGet();
        }
        return true;
    }

    @Override
    public int available() {
        return this.m_queue.size();
    }

    protected long getAdded() {
        return this.m_added.get();
    }

    protected int getBatchSize() {
        return 100;
    }

    protected int getInitialQueueSize() {
        return 500000;
    }

    protected long getOverflowed() {
        return this.m_overflowed.get();
    }

    @Override
    public int getProcessed() {
        return this.m_processed.getAndSet(0);
    }

    @Override
    public boolean isBatchReady() {
        int available = this.m_queue.size();
        return available >= this.getBatchSize() || available > 0 && this.m_lastAccess.get() + 10L < System.currentTimeMillis();
    }

    protected boolean isBlocking() {
        return true;
    }

    @Override
    public E next() throws InterruptedException {
        return this.m_queue.poll();
    }

    @Override
    public List<E> nextBatch() {
        int maxSize = this.getBatchSize();
        ArrayList m_batch = new ArrayList(maxSize);
        this.m_queue.drainTo(m_batch, maxSize);
        this.m_processed.addAndGet(m_batch.size());
        this.m_lastAccess.set(System.currentTimeMillis());
        return m_batch;
    }
}

