/*
 * Decompiled with CFR 0.152.
 */
package org.jppf.client.concurrent;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.jppf.client.JPPFJob;
import org.jppf.client.concurrent.JPPFExecutorService;
import org.jppf.client.concurrent.JPPFTaskFuture;
import org.jppf.client.event.JobEvent;
import org.jppf.client.event.JobListenerAdapter;
import org.jppf.node.protocol.Task;
import org.jppf.utils.LoggingUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JPPFCompletionService<V>
implements CompletionService<V> {
    private static Logger log = LoggerFactory.getLogger(JPPFCompletionService.class);
    private static boolean debugEnabled = LoggingUtils.isDebugEnabled((Logger)log);
    private final JPPFExecutorService executor;
    private final Map<String, Map<Integer, JPPFTaskFuture<V>>> futureMap = new HashMap<String, Map<Integer, JPPFTaskFuture<V>>>();
    private final ResultCollectorListener listener = new ResultCollectorListener();
    private final BlockingQueue<Future<V>> queue = new LinkedBlockingDeque<Future<V>>();

    public JPPFCompletionService(JPPFExecutorService executor) {
        this.executor = executor;
    }

    @Override
    public Future<V> submit(Callable<V> task) {
        return this.processFuture((JPPFTaskFuture)this.executor.submit(task));
    }

    @Override
    public Future<V> submit(Runnable task, V result) {
        return this.processFuture((JPPFTaskFuture)this.executor.submit(task, result));
    }

    @Override
    public Future<V> take() throws InterruptedException {
        return this.queue.take();
    }

    @Override
    public Future<V> poll() {
        return (Future)this.queue.poll();
    }

    @Override
    public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
        return this.queue.poll(timeout, unit);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private JPPFTaskFuture<V> processFuture(JPPFTaskFuture<V> future) {
        JPPFJob job = future.getJob();
        String uuid = job.getUuid();
        Map<String, Map<Integer, JPPFTaskFuture<V>>> map = this.futureMap;
        synchronized (map) {
            Map<Integer, JPPFTaskFuture<JPPFTaskFuture<V>>> map2 = this.futureMap.get(uuid);
            if (map2 == null) {
                job.addJobListener(this.listener);
                map2 = new HashMap<Integer, JPPFTaskFuture<V>>();
                this.futureMap.put(uuid, map2);
            }
            map2.put(future.getPosition(), future);
        }
        return future;
    }

    private void processFutureCompletion(JPPFTaskFuture<V> future) {
        if (future == null) {
            throw new IllegalArgumentException("future should not be null");
        }
        try {
            future.getResult(0L);
        }
        catch (TimeoutException e) {
            log.error(e.getMessage(), (Throwable)e);
        }
        this.queue.offer(future);
    }

    private class ResultCollectorListener
    extends JobListenerAdapter {
        private ResultCollectorListener() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void jobReturned(JobEvent event) {
            List<Task<?>> tasks = event.getJobTasks();
            if (tasks != null) {
                JPPFJob job = event.getJob();
                String uuid = job.getUuid();
                Map map = null;
                Object object = JPPFCompletionService.this.futureMap;
                synchronized (object) {
                    map = (Map)JPPFCompletionService.this.futureMap.get(uuid);
                }
                if (map == null) {
                    return;
                }
                for (Task task : tasks) {
                    JPPFTaskFuture future = null;
                    Map map2 = JPPFCompletionService.this.futureMap;
                    synchronized (map2) {
                        future = (JPPFTaskFuture)map.remove(task.getPosition());
                    }
                    if (future != null) {
                        JPPFCompletionService.this.processFutureCompletion(future);
                    }
                    if (!debugEnabled) continue;
                    log.debug("added future[job uuid=" + uuid + ", position=" + task.getPosition() + "] to the queue");
                }
                object = JPPFCompletionService.this.futureMap;
                synchronized (object) {
                    if (map.isEmpty()) {
                        JPPFCompletionService.this.futureMap.remove(uuid);
                    }
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void jobEnded(JobEvent event) {
            JPPFJob job = event.getJob();
            String uuid = job.getUuid();
            Map map = null;
            Object object = JPPFCompletionService.this.futureMap;
            synchronized (object) {
                map = (Map)JPPFCompletionService.this.futureMap.remove(uuid);
            }
            if (map != null) {
                for (Map.Entry entry : map.entrySet()) {
                    JPPFTaskFuture future = (JPPFTaskFuture)entry.getValue();
                    JPPFCompletionService.this.processFutureCompletion(future);
                    if (!debugEnabled) continue;
                    log.debug("added future[job uuid=" + uuid + ", position=" + future.getPosition() + "] to the queue");
                }
                object = JPPFCompletionService.this.futureMap;
                synchronized (object) {
                    JPPFCompletionService.this.futureMap.remove(uuid);
                }
            }
        }
    }
}

