/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ode.scheduler.simple;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.ode.bpel.iapi.ContextException;
import org.apache.ode.bpel.iapi.Scheduler;
import org.apache.ode.scheduler.simple.DatabaseDelegate;
import org.apache.ode.scheduler.simple.DatabaseException;
import org.apache.ode.scheduler.simple.Job;
import org.apache.ode.scheduler.simple.JobNoLongerInDbException;
import org.apache.ode.scheduler.simple.SchedulerThread;
import org.apache.ode.scheduler.simple.Task;
import org.apache.ode.scheduler.simple.TaskRunner;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class SimpleScheduler
implements Scheduler,
TaskRunner {
    private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
    long _immediateInterval = 30000L;
    long _nearFutureInterval = 600000L;
    long _staleInterval = 10000L;
    int _tps = 100;
    TransactionManager _txm;
    ExecutorService _exec;
    String _nodeId;
    int _todoLimit = 10000;
    volatile Scheduler.JobProcessor _jobProcessor;
    volatile Scheduler.JobProcessor _polledRunnableProcessor;
    private SchedulerThread _todo;
    private DatabaseDelegate _db;
    private CopyOnWriteArraySet<String> _knownNodes = new CopyOnWriteArraySet();
    private ConcurrentHashMap<String, Long> _lastHeartBeat = new ConcurrentHashMap();
    private boolean _running;
    private AtomicLong _nextUpgrade = new AtomicLong();
    private Random _random = new Random();
    private long _pollIntervalForPolledRunnable = Long.getLong("org.apache.ode.polledRunnable.pollInterval", 600000L);

    public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties conf) {
        this._nodeId = nodeId;
        this._db = del;
        this._todoLimit = this.getIntProperty(conf, "ode.scheduler.queueLength", this._todoLimit);
        this._immediateInterval = this.getLongProperty(conf, "ode.scheduler.immediateInterval", this._immediateInterval);
        this._nearFutureInterval = this.getLongProperty(conf, "ode.scheduler.nearFutureInterval", this._nearFutureInterval);
        this._staleInterval = this.getLongProperty(conf, "ode.scheduler.staleInterval", this._staleInterval);
        this._tps = this.getIntProperty(conf, "ode.scheduler.transactionsPerSecond", this._tps);
        this._todo = new SchedulerThread(this);
    }

    public void setPollIntervalForPolledRunnable(long pollIntervalForPolledRunnable) {
        this._pollIntervalForPolledRunnable = pollIntervalForPolledRunnable;
    }

    private int getIntProperty(Properties props, String propName, int defaultValue) {
        String s = props.getProperty(propName);
        if (s != null) {
            return Integer.parseInt(s);
        }
        return defaultValue;
    }

    private long getLongProperty(Properties props, String propName, long defaultValue) {
        String s = props.getProperty(propName);
        if (s != null) {
            return Long.parseLong(s);
        }
        return defaultValue;
    }

    public void setNodeId(String nodeId) {
        this._nodeId = nodeId;
    }

    public void setStaleInterval(long staleInterval) {
        this._staleInterval = staleInterval;
    }

    public void setImmediateInterval(long immediateInterval) {
        this._immediateInterval = immediateInterval;
    }

    public void setNearFutureInterval(long nearFutureInterval) {
        this._nearFutureInterval = nearFutureInterval;
    }

    public void setTransactionsPerSecond(int tps) {
        this._tps = tps;
    }

    public void setTransactionManager(TransactionManager txm) {
        this._txm = txm;
    }

    public void setDatabaseDelegate(DatabaseDelegate dbd) {
        this._db = dbd;
    }

    public void setExecutorService(ExecutorService executorService) {
        this._exec = executorService;
    }

    public void setPolledRunnableProcesser(Scheduler.JobProcessor polledRunnableProcessor) {
        this._polledRunnableProcessor = polledRunnableProcessor;
    }

    public void cancelJob(String jobId) throws ContextException {
        this._todo.dequeue(new Job(0L, jobId, false, null));
        try {
            this._db.deleteJob(jobId, this._nodeId);
        }
        catch (DatabaseException e) {
            __log.debug((Object)"Job removal failed.", (Throwable)e);
            throw new ContextException("Job removal failed.", (Throwable)e);
        }
    }

    public <T> Future<T> execIsolatedTransaction(final Callable<T> transaction) throws Exception, ContextException {
        return this._exec.submit(new Callable<T>(){

            @Override
            public T call() throws Exception {
                try {
                    return SimpleScheduler.this.execTransaction(transaction);
                }
                catch (Exception e) {
                    __log.error((Object)"An exception occured while executing an isolated transaction, the transaction is going to be abandoned.", (Throwable)e);
                    return null;
                }
            }
        });
    }

    public <T> T execTransaction(Callable<T> transaction) throws Exception, ContextException {
        try {
            if (__log.isDebugEnabled()) {
                __log.debug((Object)"Beginning a new transaction");
            }
            this._txm.begin();
        }
        catch (Exception ex) {
            String errmsg = "Internal Error, could not begin transaction.";
            throw new ContextException(errmsg, (Throwable)ex);
        }
        boolean success = false;
        try {
            T retval = transaction.call();
            success = true;
            T t = retval;
            return t;
        }
        catch (Exception ex) {
            throw ex;
        }
        finally {
            if (success) {
                if (__log.isDebugEnabled()) {
                    __log.debug((Object)("Commiting on " + this._txm + "..."));
                }
                this._txm.commit();
            } else {
                if (__log.isDebugEnabled()) {
                    __log.debug((Object)("Rollbacking on " + this._txm + "..."));
                }
                this._txm.rollback();
            }
        }
    }

    public void setRollbackOnly() throws Exception {
        this._txm.setRollbackOnly();
    }

    public void registerSynchronizer(final Scheduler.Synchronizer synch) throws ContextException {
        try {
            this._txm.getTransaction().registerSynchronization(new Synchronization(){

                public void beforeCompletion() {
                    synch.beforeCompletion();
                }

                public void afterCompletion(int status) {
                    synch.afterCompletion(status == 3);
                }
            });
        }
        catch (Exception e) {
            throw new ContextException("Unable to register synchronizer.", (Throwable)e);
        }
    }

    public String schedulePersistedJob(Map<String, Object> jobDetail, Date when) throws ContextException {
        long ctime = System.currentTimeMillis();
        if (when == null) {
            when = new Date(ctime);
        }
        if (__log.isDebugEnabled()) {
            __log.debug((Object)("scheduling " + jobDetail + " for " + when));
        }
        return this.schedulePersistedJob(new Job(when.getTime(), true, jobDetail), when, ctime);
    }

    public String scheduleMapSerializableRunnable(Scheduler.MapSerializableRunnable runnable, Date when) throws ContextException {
        long ctime = System.currentTimeMillis();
        if (when == null) {
            when = new Date(ctime);
        }
        HashMap<String, Object> jobDetails = new HashMap<String, Object>();
        jobDetails.put("runnable", runnable);
        runnable.storeToDetailsMap(jobDetails);
        if (__log.isDebugEnabled()) {
            __log.debug((Object)("scheduling " + jobDetails + " for " + when));
        }
        return this.schedulePersistedJob(new Job(when.getTime(), true, jobDetails), when, ctime);
    }

    private String schedulePersistedJob(Job job, Date when, long ctime) throws ContextException {
        boolean immediate = when.getTime() <= ctime + this._immediateInterval;
        boolean nearfuture = !immediate && when.getTime() <= ctime + this._nearFutureInterval;
        try {
            if (immediate) {
                this._db.insertJob(job, this._nodeId, true);
                if (this._todo.size() < this._todoLimit) {
                    this.addTodoOnCommit(job);
                }
                __log.debug((Object)("scheduled immediate job: " + job.jobId));
            } else if (nearfuture) {
                this._db.insertJob(job, this._nodeId, false);
                __log.debug((Object)("scheduled near-future job: " + job.jobId));
            } else {
                this._db.insertJob(job, null, false);
                __log.debug((Object)("scheduled far-future job: " + job.jobId));
            }
        }
        catch (DatabaseException dbe) {
            __log.error((Object)"Database error.", (Throwable)dbe);
            throw new ContextException("Database error.", (Throwable)dbe);
        }
        return job.jobId;
    }

    public String scheduleVolatileJob(boolean transacted, Map<String, Object> jobDetail) throws ContextException {
        Job job = new Job(System.currentTimeMillis(), transacted, jobDetail);
        job.persisted = false;
        this.addTodoOnCommit(job);
        return job.toString();
    }

    public void setJobProcessor(Scheduler.JobProcessor processor) throws ContextException {
        this._jobProcessor = processor;
    }

    public void shutdown() {
        this.stop();
        this._jobProcessor = null;
        this._txm = null;
        this._todo = null;
    }

    public synchronized void start() {
        if (this._running) {
            return;
        }
        if (this._exec == null) {
            this._exec = Executors.newCachedThreadPool();
        }
        this._todo.clearTasks(UpgradeJobsTask.class);
        this._todo.clearTasks(LoadImmediateTask.class);
        this._todo.clearTasks(CheckStaleNodes.class);
        this._knownNodes.clear();
        try {
            this.execTransaction(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    SimpleScheduler.this._knownNodes.addAll(SimpleScheduler.this._db.getNodeIds());
                    return null;
                }
            });
        }
        catch (Exception ex) {
            __log.error((Object)"Error retrieving node list.", (Throwable)ex);
            throw new ContextException("Error retrieving node list.", (Throwable)ex);
        }
        long now = System.currentTimeMillis();
        for (String s : this._knownNodes) {
            this._lastHeartBeat.put(s, now);
        }
        this._todo.enqueue(new LoadImmediateTask(now));
        this._todo.enqueue(new CheckStaleNodes(now + this.randomMean(this._staleInterval)));
        this._todo.enqueue(new UpgradeJobsTask(now + this.randomMean(this._immediateInterval)));
        this._todo.start();
        this._running = true;
    }

    private long randomMean(long mean) {
        return (long)this._random.nextDouble() * mean + mean / 2L;
    }

    public synchronized void stop() {
        if (!this._running) {
            return;
        }
        this._todo.stop();
        this._todo.clearTasks(UpgradeJobsTask.class);
        this._todo.clearTasks(LoadImmediateTask.class);
        this._todo.clearTasks(CheckStaleNodes.class);
        this._running = false;
    }

    protected void runJob(final Job job) {
        final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail, ((Integer)(job.detail.get("retry") != null ? job.detail.get("retry") : Integer.valueOf(0))).intValue());
        this._exec.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                if (job.transacted) {
                    try {
                        SimpleScheduler.this.execTransaction(new Callable<Void>(){

                            @Override
                            public Void call() throws Exception {
                                if (job.persisted && !SimpleScheduler.this._db.deleteJob(job.jobId, SimpleScheduler.this._nodeId)) {
                                    throw new JobNoLongerInDbException(job.jobId, SimpleScheduler.this._nodeId);
                                }
                                try {
                                    SimpleScheduler.this._jobProcessor.onScheduledJob(jobInfo);
                                }
                                catch (Scheduler.JobProcessorException jpe) {
                                    if (jpe.retry) {
                                        int retry;
                                        int n = retry = job.detail.get("retry") != null ? (Integer)job.detail.get("retry") + 1 : 0;
                                        if (retry <= 10) {
                                            long delay = SimpleScheduler.this.doRetry(job);
                                            __log.error((Object)("Error while processing transaction, retrying in " + delay + "s"));
                                        } else {
                                            __log.error((Object)("Error while processing transaction after 10 retries, no more retries:" + job));
                                        }
                                    } else {
                                        __log.error((Object)"Error while processing transaction, no retry.", (Throwable)jpe);
                                    }
                                    throw jpe;
                                }
                                return null;
                            }
                        });
                    }
                    catch (JobNoLongerInDbException jde) {
                        __log.debug((Object)"job no longer in db forced rollback.");
                    }
                    catch (Exception ex) {
                        __log.error((Object)"Error while executing transaction", (Throwable)ex);
                    }
                } else {
                    SimpleScheduler.this._jobProcessor.onScheduledJob(jobInfo);
                }
                return null;
            }
        });
    }

    protected void runPolledRunnable(final Job job) {
        final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, job.detail, ((Integer)(job.detail.get("retry") != null ? job.detail.get("retry") : Integer.valueOf(0))).intValue());
        this._exec.submit(new Callable<Void>(){

            @Override
            public Void call() throws Exception {
                try {
                    SimpleScheduler.this.execTransaction(new Callable<Void>(){

                        @Override
                        public Void call() throws Exception {
                            if (!SimpleScheduler.this._db.deleteJob(job.jobId, SimpleScheduler.this._nodeId)) {
                                throw new JobNoLongerInDbException(job.jobId, SimpleScheduler.this._nodeId);
                            }
                            try {
                                SimpleScheduler.this._polledRunnableProcessor.onScheduledJob(jobInfo);
                                if (!"COMPLETED".equals(String.valueOf(jobInfo.jobDetail.get("runnable_status")))) {
                                    if (SimpleScheduler.this._pollIntervalForPolledRunnable < 0L) {
                                        if (__log.isWarnEnabled()) {
                                            __log.warn((Object)"The poll interval for polled runnables is negative; setting it to 1000ms");
                                        }
                                        SimpleScheduler.this._pollIntervalForPolledRunnable = 1000L;
                                    }
                                    job.schedDate = System.currentTimeMillis() + SimpleScheduler.this._pollIntervalForPolledRunnable;
                                    SimpleScheduler.this._db.insertJob(job, SimpleScheduler.this._nodeId, false);
                                }
                            }
                            catch (Scheduler.JobProcessorException jpe) {
                                if (jpe.retry) {
                                    int retry;
                                    int n = retry = job.detail.get("retry") != null ? (Integer)job.detail.get("retry") + 1 : 0;
                                    if (retry <= 10) {
                                        long delay = SimpleScheduler.this.doRetry(job);
                                        __log.error((Object)("Error while processing transaction, retrying in " + delay + "s"));
                                    } else {
                                        __log.error((Object)("Error while processing transaction after 10 retries, no more retries:" + job));
                                    }
                                } else {
                                    __log.error((Object)"Error while processing transaction, no retry.", (Throwable)jpe);
                                }
                                throw jpe;
                            }
                            return null;
                        }
                    });
                }
                catch (JobNoLongerInDbException jde) {
                    __log.debug((Object)"job no longer in db forced rollback.");
                }
                catch (Exception ex) {
                    __log.error((Object)"Error while executing transaction", (Throwable)ex);
                }
                return null;
            }
        });
    }

    private void addTodoOnCommit(final Job job) {
        this.registerSynchronizer(new Scheduler.Synchronizer(){

            public void afterCompletion(boolean success) {
                if (success) {
                    SimpleScheduler.this._todo.enqueue(job);
                }
            }

            public void beforeCompletion() {
            }
        });
    }

    public boolean isTransacted() {
        try {
            Transaction tx = this._txm.getTransaction();
            return tx != null && tx.getStatus() != 6;
        }
        catch (SystemException e) {
            throw new ContextException("Internal Error: Could not obtain transaction status.");
        }
    }

    @Override
    public void runTask(Task task) {
        if (task instanceof Job) {
            Job job = (Job)task;
            if (job.detail.get("runnable") != null) {
                this.runPolledRunnable(job);
            } else {
                this.runJob((Job)task);
            }
        } else if (task instanceof SchedulerTask) {
            ((SchedulerTask)task).run();
        }
    }

    public void updateHeartBeat(String nodeId) {
        if (nodeId == null) {
            return;
        }
        if (this._nodeId.equals(nodeId)) {
            return;
        }
        this._lastHeartBeat.put(nodeId, System.currentTimeMillis());
        this._knownNodes.add(nodeId);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean doLoadImmediate() {
        __log.debug((Object)"LOAD IMMEDIATE started");
        if (this._todo.size() > this._todoLimit / 2) {
            return true;
        }
        try {
            final int batch = (int)(this._immediateInterval * (long)this._tps / 1000L);
            List<Job> jobs = this.execTransaction(new Callable<List<Job>>(){

                @Override
                public List<Job> call() throws Exception {
                    return SimpleScheduler.this._db.dequeueImmediate(SimpleScheduler.this._nodeId, System.currentTimeMillis() + SimpleScheduler.this._immediateInterval, batch);
                }
            });
            for (Job j : jobs) {
                if (__log.isDebugEnabled()) {
                    __log.debug((Object)("todo.enqueue job from db: " + j.jobId + " for " + j.schedDate));
                }
                if (this._todo.size() >= this._todoLimit) continue;
                this._todo.enqueue(j);
            }
            boolean bl = true;
            return bl;
        }
        catch (Exception ex) {
            __log.error((Object)"Error loading immediate jobs from database.", (Throwable)ex);
            boolean bl = false;
            return bl;
        }
        finally {
            __log.debug((Object)"LOAD IMMEDIATE complete");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    boolean doUpgrade() {
        __log.debug((Object)"UPGRADE started");
        final ArrayList<String> knownNodes = new ArrayList<String>(this._knownNodes);
        knownNodes.add(this._nodeId);
        Collections.sort(knownNodes);
        final long maxtime = System.currentTimeMillis() + this._nearFutureInterval;
        try {
            boolean bl = this.execTransaction(new Callable<Boolean>(){

                @Override
                public Boolean call() throws Exception {
                    int numNodes = knownNodes.size();
                    for (int i = 0; i < numNodes; ++i) {
                        String node = (String)knownNodes.get(i);
                        SimpleScheduler.this._db.updateAssignToNode(node, i, numNodes, maxtime);
                    }
                    return true;
                }
            });
            return bl;
        }
        catch (Exception ex) {
            __log.error((Object)"Database error upgrading jobs.", (Throwable)ex);
            boolean bl = false;
            return bl;
        }
        finally {
            __log.debug((Object)"UPGRADE complete");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void recoverStaleNode(final String nodeId) {
        __log.debug((Object)("recovering stale node " + nodeId));
        try {
            int numrows = this.execTransaction(new Callable<Integer>(){

                @Override
                public Integer call() throws Exception {
                    return SimpleScheduler.this._db.updateReassign(nodeId, SimpleScheduler.this._nodeId);
                }
            });
            __log.debug((Object)("reassigned " + numrows + " jobs to self. "));
            this._knownNodes.remove(nodeId);
            this._lastHeartBeat.remove(nodeId);
            this.doLoadImmediate();
        }
        catch (Exception ex) {
            __log.error((Object)"Database error reassigning node.", (Throwable)ex);
        }
        finally {
            __log.debug((Object)"node recovery complete");
        }
    }

    private long doRetry(Job job) throws DatabaseException {
        int retry = job.detail.get("retry") != null ? (Integer)job.detail.get("retry") + 1 : 0;
        job.detail.put("retry", retry);
        long delay = (long)Math.pow(5.0, retry);
        Job jobRetry = new Job(System.currentTimeMillis() + delay * 1000L, true, job.detail);
        this._db.insertJob(jobRetry, this._nodeId, false);
        return delay;
    }

    private class CheckStaleNodes
    extends SchedulerTask {
        CheckStaleNodes(long schedDate) {
            super(schedDate);
        }

        public void run() {
            SimpleScheduler.this._todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + SimpleScheduler.this._staleInterval));
            __log.debug((Object)"CHECK STALE NODES started");
            for (String nodeId : SimpleScheduler.this._knownNodes) {
                Long lastSeen = (Long)SimpleScheduler.this._lastHeartBeat.get(nodeId);
                if (lastSeen != null && System.currentTimeMillis() - lastSeen <= SimpleScheduler.this._staleInterval || SimpleScheduler.this._nodeId.equals(nodeId)) continue;
                SimpleScheduler.this.recoverStaleNode(nodeId);
            }
        }
    }

    private class UpgradeJobsTask
    extends SchedulerTask {
        UpgradeJobsTask(long schedDate) {
            super(schedDate);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            long ctime = System.currentTimeMillis();
            long ntime = SimpleScheduler.this._nextUpgrade.get();
            __log.debug((Object)("UPGRADE task for " + this.schedDate + " fired at " + ctime));
            if (SimpleScheduler.this._nextUpgrade.get() > System.currentTimeMillis()) {
                __log.debug((Object)("UPGRADE skipped -- wait another " + (ntime - ctime) + "ms"));
                SimpleScheduler.this._todo.enqueue(new UpgradeJobsTask(ntime));
                return;
            }
            boolean success = false;
            try {
                success = SimpleScheduler.this.doUpgrade();
            }
            catch (Throwable throwable) {
                long future = System.currentTimeMillis() + (success ? (long)((double)SimpleScheduler.this._nearFutureInterval * 0.5) : 1000L);
                SimpleScheduler.this._nextUpgrade.set(future);
                SimpleScheduler.this._todo.enqueue(new UpgradeJobsTask(future));
                __log.debug((Object)("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms"));
                throw throwable;
            }
            long future = System.currentTimeMillis() + (success ? (long)((double)SimpleScheduler.this._nearFutureInterval * 0.5) : 1000L);
            SimpleScheduler.this._nextUpgrade.set(future);
            SimpleScheduler.this._todo.enqueue(new UpgradeJobsTask(future));
            __log.debug((Object)("UPGRADE completed, success = " + success + "; next time in " + (future - ctime) + "ms"));
        }
    }

    private class LoadImmediateTask
    extends SchedulerTask {
        LoadImmediateTask(long schedDate) {
            super(schedDate);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run() {
            boolean success = false;
            try {
                success = SimpleScheduler.this.doLoadImmediate();
            }
            finally {
                if (success) {
                    SimpleScheduler.this._todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + (long)((double)SimpleScheduler.this._immediateInterval * 0.9)));
                } else {
                    SimpleScheduler.this._todo.enqueue(new LoadImmediateTask(System.currentTimeMillis() + 1000L));
                }
            }
        }
    }

    private abstract class SchedulerTask
    extends Task
    implements Runnable {
        SchedulerTask(long schedDate) {
            super(schedDate);
        }
    }
}

