/*
 * Decompiled with CFR 0.152.
 */
package org.jppf.client.balancer;

import java.io.IOException;
import java.io.NotSerializableException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import org.jppf.JPPFException;
import org.jppf.client.JPPFClientConnection;
import org.jppf.client.JPPFClientConnectionImpl;
import org.jppf.client.JPPFClientConnectionStatus;
import org.jppf.client.balancer.AbstractChannelWrapperRemote;
import org.jppf.client.balancer.ClientTaskBundle;
import org.jppf.client.balancer.JobManagerClient;
import org.jppf.client.event.ClientConnectionStatusListener;
import org.jppf.load.balancer.Bundler;
import org.jppf.load.balancer.BundlerHelper;
import org.jppf.node.protocol.BundleParameter;
import org.jppf.node.protocol.Task;
import org.jppf.node.protocol.TaskBundle;
import org.jppf.serialization.ObjectSerializer;
import org.jppf.utils.ExceptionUtils;
import org.jppf.utils.LoggingUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChannelWrapperRemoteAsync
extends AbstractChannelWrapperRemote {
    private static Logger log = LoggerFactory.getLogger(ChannelWrapperRemoteAsync.class);
    private static boolean debugEnabled = LoggingUtils.isDebugEnabled((Logger)log);
    private final BlockingQueue<ClientTaskBundle> bundleQueue = new LinkedBlockingQueue<ClientTaskBundle>();
    private final Map<Long, RemoteResponse> responseMap = new ConcurrentHashMap<Long, RemoteResponse>();
    private boolean initDone;
    private final List<Future<?>> futures = new ArrayList();
    private final Object statusLock = new Object();
    private final ClientConnectionStatusListener listener = event -> {
        Object object = this.statusLock;
        synchronized (object) {
            this.statusLock.notifyAll();
        }
    };
    private final Object resubmitLock = new Object();

    public ChannelWrapperRemoteAsync(JPPFClientConnection channel) {
        super(channel);
        channel.addClientConnectionStatusListener(this.listener);
    }

    @Override
    public void initChannelID() {
        super.initChannelID();
        if (!this.initDone) {
            this.initDone = true;
            ThreadPoolExecutor executor = this.channel.getConnectionPool().getClient().getExecutor();
            this.futures.add(executor.submit(new RemoteSender()));
            this.futures.add(executor.submit(new RemoteReceiver()));
        }
    }

    public Future<?> submit(ClientTaskBundle bundle) {
        if (debugEnabled) {
            log.debug("submitting {} to {}", (Object)bundle, (Object)this);
        }
        if (!this.channel.isClosed()) {
            this.jobCount.incrementAndGet();
            if (this.getCurrentNbJobs() >= this.getMaxJobs()) {
                this.setStatus(JPPFClientConnectionStatus.EXECUTING);
            }
            this.bundleQueue.offer(bundle);
            if (debugEnabled) {
                log.debug("submitted {} to {}", (Object)bundle, (Object)this);
            }
        } else {
            if (debugEnabled) {
                log.debug("resubmitting {}", (Object)bundle);
            }
            this.resubmitBundle(bundle, null);
        }
        return null;
    }

    @Override
    public boolean isLocal() {
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Exception handleThrowable(ClientTaskBundle clientBundle, Throwable t, boolean fromSender) {
        if (debugEnabled) {
            log.debug("handling throwable from {} for {}:\nchannel = {}", new Object[]{fromSender ? "sender" : "receiver", clientBundle, this, t});
        }
        boolean channelClosed = this.channel.isClosed();
        if (debugEnabled) {
            log.debug("channelClosed={}, resetting={}", (Object)channelClosed, (Object)this.resetting);
        }
        if (!channelClosed) {
            String jobMsg = clientBundle == null ? "" : " while handling job " + (Object)((Object)clientBundle);
            log.warn("Throwable was raised{} on channel {}\n{}", new Object[]{jobMsg, this, ExceptionUtils.getStackTrace((Throwable)t)});
        }
        Exception exception = t == null ? null : (t instanceof Exception ? (Exception)t : new JPPFException(t));
        try {
            if (t instanceof NotSerializableException) {
                if (clientBundle != null) {
                    clientBundle.resultsReceived(t);
                }
            } else {
                this.reconnect();
                if (clientBundle != null) {
                    if (debugEnabled) {
                        log.debug("resubmitting {}", (Object)clientBundle);
                    }
                    this.resubmitBundle(clientBundle, null);
                }
                this.resubmitQueueBundles();
            }
        }
        finally {
            if (clientBundle != null) {
                if (this.jobCount.get() > 0) {
                    this.jobCount.decrementAndGet();
                }
                if (this.getStatus() == JPPFClientConnectionStatus.EXECUTING && this.getCurrentNbJobs() < this.getMaxJobs()) {
                    this.setStatus(JPPFClientConnectionStatus.ACTIVE);
                }
            }
        }
        return exception;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void resubmitQueueBundles() {
        Object object = this.resubmitLock;
        synchronized (object) {
            if (debugEnabled) {
                log.debug("resubmitting all queued jobs of {}", (Object)this);
            }
            HashSet resubmitted = new HashSet(this.jobCount.get());
            HashSet resubmittedIds = new HashSet(this.jobCount.get());
            this.bundleQueue.drainTo(resubmitted);
            ConcurrentHashMap<Long, RemoteResponse> map = new ConcurrentHashMap<Long, RemoteResponse>(this.responseMap);
            this.responseMap.clear();
            resubmitted.forEach(bundle -> {
                resubmittedIds.add(bundle.getBundleId());
                if (debugEnabled) {
                    log.debug("resubmitting {}", (Object)bundle);
                }
                this.resubmitBundle((ClientTaskBundle)((Object)bundle), null);
            });
            this.jobCount.set(0);
            map.forEach((id, response) -> {
                if (!resubmittedIds.contains(id)) {
                    if (debugEnabled) {
                        log.debug("resubmitting {}", (Object)response.clientBundle);
                    }
                    this.resubmitBundle(response.clientBundle, null);
                }
            });
        }
    }

    private void resubmitBundle(ClientTaskBundle clientBundle, Exception e) {
        if (debugEnabled) {
            log.debug("resubmitting {} with exception {}", (Object)clientBundle, (Object)e);
        }
        clientBundle.resubmit();
        clientBundle.taskCompleted(null);
        clientBundle.getClientJob().removeChannel(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleBundleComplete(ClientTaskBundle clientBundle, Exception exception) {
        try {
            boolean channelClosed = this.channel.isClosed();
            if (debugEnabled) {
                log.debug("channelClosed={}, resetting={}, bundle={}, exception={}", new Object[]{channelClosed, this.resetting, clientBundle, exception});
            }
            if ((!channelClosed || this.resetting) && clientBundle != null) {
                clientBundle.taskCompleted(exception instanceof IOException ? null : exception);
            }
            if (clientBundle != null) {
                clientBundle.getClientJob().removeChannel(this);
            }
        }
        catch (Exception e) {
            log.error(e.getMessage(), (Throwable)e);
        }
        finally {
            this.jobCount.decrementAndGet();
            if (this.getStatus() == JPPFClientConnectionStatus.EXECUTING && this.getCurrentNbJobs() < this.getMaxJobs()) {
                this.setStatus(JPPFClientConnectionStatus.ACTIVE);
            }
            ((JobManagerClient)this.channel.getConnectionPool().getClient().getJobManager()).getJobScheduler().wakeUp();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        if (debugEnabled) {
            log.debug("closing {}, resetting={}", (Object)this, (Object)this.resetting);
        }
        this.resubmitQueueBundles();
        this.channel.removeClientConnectionStatusListener(this.listener);
        ClientConnectionStatusListener clientConnectionStatusListener = this.listener;
        synchronized (clientConnectionStatusListener) {
            this.listener.notifyAll();
        }
        super.close();
        this.futures.forEach(future -> future.cancel(true));
        this.futures.clear();
        this.initDone = false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void awaitStatus() throws Exception {
        Object object = this.statusLock;
        synchronized (object) {
            JPPFClientConnectionStatus status;
            while (!(status = this.getStatus()).isWorkingStatus() && !status.isTerminatedStatus()) {
                this.statusLock.wait();
            }
        }
    }

    public boolean isAsynchronous() {
        return true;
    }

    public int getMaxJobs() {
        return this.channel.getConnectionPool().getMaxJobs();
    }

    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder(this.getClass().getSimpleName()).append('[').append("bundleQueue=").append(this.bundleQueue.size()).append(", responseMap=").append(this.responseMap.size()).append(", jobCount=").append(this.getCurrentNbJobs()).append(", resetting=").append(this.resetting).append(", bundlerAlgorithm=").append(this.bundlerAlgorithm).append(", channel=");
        try {
            sb.append(this.channel);
        }
        catch (Exception e) {
            sb.append(ExceptionUtils.getMessage((Throwable)e));
        }
        return sb.append(']').toString();
    }

    private static class RemoteResponse {
        final ClientTaskBundle clientBundle;
        final int taskCount;
        final ClassLoader cl;
        final ObjectSerializer ser;
        final long start;
        long elapsed;
        int currentCount;

        public RemoteResponse(ClientTaskBundle clientBundle, int notSerializableTasksCount, ClassLoader cl, ObjectSerializer ser, long start) {
            this.clientBundle = clientBundle;
            this.taskCount = clientBundle.getTasksL().size();
            this.currentCount = notSerializableTasksCount;
            this.cl = cl;
            this.ser = ser;
            this.start = start;
        }

        void handleResults(List<Task<?>> results) {
            this.elapsed = System.nanoTime() - this.start;
            int n = results.size();
            this.currentCount += n;
            if (debugEnabled) {
                log.debug("received {} tasks from server{}", (Object)n, (Object)(n > 0 ? ", first position=" + results.get(0).getPosition() : ""));
            }
            this.clientBundle.resultsReceived(results);
        }
    }

    private class RemoteReceiver
    implements Runnable {
        private Logger thisLog = LoggerFactory.getLogger(RemoteReceiver.class);
        private boolean thisDebugEnabled = LoggingUtils.isDebugEnabled((Logger)this.thisLog);

        private RemoteReceiver() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (debugEnabled) {
                log.debug("entering receiver loop for {}", (Object)ChannelWrapperRemoteAsync.this);
            }
            JPPFClientConnectionImpl connection = (JPPFClientConnectionImpl)ChannelWrapperRemoteAsync.this.channel;
            while (!ChannelWrapperRemoteAsync.this.channel.isClosed()) {
                RemoteResponse response;
                long bundleId;
                TaskBundle bundle;
                boolean complete;
                Exception exception;
                ClientTaskBundle clientBundle;
                block16: {
                    clientBundle = null;
                    exception = null;
                    complete = false;
                    ChannelWrapperRemoteAsync.this.awaitStatus();
                    bundle = connection.receiveHeader(null, null);
                    if (this.thisDebugEnabled) {
                        this.thisLog.debug("received bundle {}", (Object)bundle);
                    }
                    bundleId = (Long)bundle.getParameter((Object)BundleParameter.CLIENT_BUNDLE_ID);
                    response = (RemoteResponse)ChannelWrapperRemoteAsync.this.responseMap.remove(bundleId);
                    if (response != null) break block16;
                    this.thisLog.debug("response object no longer in queue for bundleId = {}", (Object)bundleId);
                    if (!complete) continue;
                    ChannelWrapperRemoteAsync.this.handleBundleComplete(clientBundle, exception);
                    continue;
                }
                try {
                    RemoteResponse remoteResponse = response;
                    synchronized (remoteResponse) {
                        clientBundle = response.clientBundle;
                        List tasks = connection.receiveTasks(bundle, response.ser, response.cl);
                        if (this.thisDebugEnabled) {
                            this.thisLog.debug("received {} tasks for {}", (Object)tasks.size(), (Object)clientBundle);
                        }
                        response.handleResults(tasks);
                        if (response.currentCount < response.taskCount) {
                            ChannelWrapperRemoteAsync.this.responseMap.put(bundleId, response);
                        } else {
                            complete = true;
                            BundlerHelper.updateBundler((Bundler)ChannelWrapperRemoteAsync.this.bundler, (int)tasks.size(), (double)response.elapsed);
                            ChannelWrapperRemoteAsync.this.getLoadBalancerPersistenceManager().storeBundler(ChannelWrapperRemoteAsync.this.channelID, ChannelWrapperRemoteAsync.this.bundler, ChannelWrapperRemoteAsync.this.bundlerAlgorithm);
                        }
                    }
                    if (!complete) continue;
                }
                catch (Throwable t) {
                    try {
                        exception = ChannelWrapperRemoteAsync.this.handleThrowable(clientBundle, t, false);
                        if (!complete) continue;
                    }
                    catch (Throwable throwable) {
                        if (complete) {
                            ChannelWrapperRemoteAsync.this.handleBundleComplete(clientBundle, exception);
                        }
                        throw throwable;
                    }
                    ChannelWrapperRemoteAsync.this.handleBundleComplete(clientBundle, exception);
                    continue;
                }
                ChannelWrapperRemoteAsync.this.handleBundleComplete(clientBundle, exception);
            }
            if (debugEnabled) {
                log.debug("exiting receiver loop for {}", (Object)ChannelWrapperRemoteAsync.this);
            }
        }
    }

    private class RemoteSender
    implements Runnable {
        private Logger thisLog = LoggerFactory.getLogger(RemoteSender.class);
        private boolean thisDebugEnabled = LoggingUtils.isDebugEnabled((Logger)this.thisLog);

        private RemoteSender() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            if (debugEnabled) {
                log.debug("entering sender loop for {}", (Object)ChannelWrapperRemoteAsync.this);
            }
            JPPFClientConnectionImpl connection = (JPPFClientConnectionImpl)ChannelWrapperRemoteAsync.this.channel;
            while (!ChannelWrapperRemoteAsync.this.channel.isClosed()) {
                ClientTaskBundle clientBundle = null;
                try {
                    RemoteResponse response;
                    ChannelWrapperRemoteAsync.this.awaitStatus();
                    clientBundle = (ClientTaskBundle)((Object)ChannelWrapperRemoteAsync.this.bundleQueue.take());
                    long bundleId = clientBundle.getBundleId();
                    List<Task<?>> tasks = clientBundle.getTasksL();
                    if (this.thisDebugEnabled) {
                        int size = tasks.size();
                        int[] positions = new int[size];
                        for (int i = 0; i < size; ++i) {
                            positions[i] = tasks.get(i).getPosition();
                        }
                        this.thisLog.debug("{} executing {} tasks of job {} with bundleId = {}, positions={}", new Object[]{ChannelWrapperRemoteAsync.this, size, clientBundle, bundleId, Arrays.toString(positions)});
                    }
                    Collection<ClassLoader> loaders = ChannelWrapperRemoteAsync.this.registerClassLoaders(clientBundle.getUuid(), tasks);
                    TaskBundle bundle = AbstractChannelWrapperRemote.createBundle(clientBundle, bundleId);
                    bundle.setUuid(ChannelWrapperRemoteAsync.this.uuid);
                    bundle.setInitialTaskCount(clientBundle.getClientJob().initialTaskCount);
                    ClassLoader cl = loaders.isEmpty() ? null : loaders.iterator().next();
                    ObjectSerializer ser = connection.makeHelper(cl).getSerializer();
                    long start = System.nanoTime();
                    RemoteResponse remoteResponse = response = new RemoteResponse(clientBundle, 0, cl, ser, start);
                    synchronized (remoteResponse) {
                        if (response.currentCount < response.taskCount) {
                            ChannelWrapperRemoteAsync.this.responseMap.put(bundleId, response);
                        }
                        if (this.thisDebugEnabled) {
                            this.thisLog.debug("{} sending {}", (Object)ChannelWrapperRemoteAsync.this, (Object)clientBundle);
                        }
                        List notSerializableTasks = connection.sendTasks(ser, cl, bundle, clientBundle);
                        clientBundle.jobDispatched(ChannelWrapperRemoteAsync.this);
                        if (!notSerializableTasks.isEmpty()) {
                            if (this.thisDebugEnabled) {
                                this.thisLog.debug("got {} non-serializable tasks for {}", (Object)notSerializableTasks.size(), (Object)clientBundle);
                            }
                            response.currentCount = notSerializableTasks.size();
                            clientBundle.resultsReceived(notSerializableTasks);
                        }
                        if (response.currentCount >= response.taskCount) {
                            ChannelWrapperRemoteAsync.this.handleBundleComplete(clientBundle, null);
                        }
                    }
                }
                catch (Throwable t) {
                    ChannelWrapperRemoteAsync.this.handleThrowable(clientBundle, t, true);
                }
            }
            if (debugEnabled) {
                log.debug("exiting sender loop for {}", (Object)ChannelWrapperRemoteAsync.this);
            }
        }
    }
}

