/*
 * Decompiled with CFR 0.152.
 */
package org.ovirt.vdsm.jsonrpc.client.internal;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonFactoryBuilder;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.NullNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.util.Map;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import org.ovirt.vdsm.jsonrpc.client.JsonRpcClient;
import org.ovirt.vdsm.jsonrpc.client.JsonRpcEvent;
import org.ovirt.vdsm.jsonrpc.client.JsonRpcResponse;
import org.ovirt.vdsm.jsonrpc.client.events.EventPublisher;
import org.ovirt.vdsm.jsonrpc.client.internal.MessageContext;
import org.ovirt.vdsm.jsonrpc.client.internal.ResponseTracker;
import org.ovirt.vdsm.jsonrpc.client.reactors.ReactorClient;
import org.ovirt.vdsm.jsonrpc.client.utils.JsonUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ResponseWorker
extends Thread {
    private final LinkedBlockingQueue<MessageContext> queue = new LinkedBlockingQueue();
    private static final ObjectMapper MAPPER;
    private final ResponseTracker tracker = new ResponseTracker();
    private final EventPublisher publisher;
    private static final Logger log;

    public ResponseWorker(int parallelism, int eventTimeoutInHours) {
        this.publisher = new EventPublisher(new ForkJoinPool(parallelism, ResponseForkJoinWorkerThread::new, null, true), eventTimeoutInHours);
        Thread trackerThread = new Thread(this.tracker);
        trackerThread.setName("Response tracker");
        trackerThread.setDaemon(true);
        trackerThread.start();
        this.setName("ResponseWorker");
        this.setDaemon(true);
        this.start();
    }

    public JsonRpcClient register(ReactorClient client) {
        JsonRpcClient jsonRpcClient = new JsonRpcClient(client, this.tracker);
        client.addEventListener(message -> this.queue.add(new MessageContext(jsonRpcClient, message)));
        return jsonRpcClient;
    }

    @Override
    public void run() {
        AtomicReference<MessageContext> contextRef = new AtomicReference<MessageContext>();
        block2: while (true) {
            try {
                while (true) {
                    JsonNode rootNode;
                    contextRef.set(this.queue.take());
                    if (((MessageContext)contextRef.get()).getClient() == null) break block2;
                    if (log.isDebugEnabled()) {
                        log.debug("Message received: " + new String(((MessageContext)contextRef.get()).getMessage(), JsonUtils.UTF8));
                    }
                    if (!(rootNode = MAPPER.readTree(((MessageContext)contextRef.get()).getMessage())).isArray()) {
                        this.processIncomingObject(((MessageContext)contextRef.get()).getClient(), rootNode);
                        continue;
                    }
                    rootNode.elements().forEachRemaining(node -> this.processIncomingObject(((MessageContext)contextRef.get()).getClient(), (JsonNode)node));
                }
            }
            catch (Exception e) {
                log.warn("Exception thrown during message processing");
                if (!log.isDebugEnabled()) continue;
                log.debug(e.getMessage(), (Throwable)e);
                continue;
            }
            break;
        }
    }

    private void processIncomingObject(JsonRpcClient client, JsonNode node) {
        JsonNode id = node.get("id");
        JsonNode error = node.get("error");
        if (error != null && !NullNode.class.isInstance(error)) {
            String hostId;
            JsonRpcResponse response = JsonRpcResponse.fromJsonNode(node);
            Map<String, Object> map = JsonUtils.mapValues(response.getError());
            Object code = map.get("code");
            if (String.class.isInstance(code) && (hostId = (String)code).contains(":")) {
                String host = hostId.substring(0, hostId.indexOf(":"));
                ObjectNode params = MAPPER.createObjectNode();
                params.put("communicationError", (String)map.get("message"));
                JsonRpcEvent event = new JsonRpcEvent(host + "|*|*|*", (JsonNode)params);
                this.processNotifications(event);
            }
            client.processResponse(response);
            return;
        }
        if (id == null || NullNode.class.isInstance(id)) {
            JsonRpcEvent event = JsonRpcEvent.fromJsonNode(node);
            String method = client.getHostname() + event.getMethod();
            event.setMethod(method);
            if (log.isDebugEnabled()) {
                log.debug("Event arrived from " + client.getHostname() + " containing " + event.getParams());
            }
            this.processNotifications(event);
            return;
        }
        try {
            client.processResponse(JsonRpcResponse.fromJsonNode(node));
        }
        catch (IllegalArgumentException e) {
            JsonUtils.logException(log, "Received response is not correct", e);
        }
    }

    private void processNotifications(JsonRpcEvent notification) {
        this.publisher.process(notification);
    }

    public void close() {
        this.queue.add(new MessageContext(null, null));
        this.tracker.close();
        this.publisher.close();
    }

    public EventPublisher getPublisher() {
        return this.publisher;
    }

    static {
        log = LoggerFactory.getLogger(ResponseWorker.class);
        MAPPER = new ObjectMapper(((JsonFactoryBuilder)((JsonFactoryBuilder)new JsonFactoryBuilder().configure(JsonFactory.Feature.INTERN_FIELD_NAMES, false)).configure(JsonFactory.Feature.CANONICALIZE_FIELD_NAMES, false)).build());
    }

    static class ResponseForkJoinWorkerThread
    extends ForkJoinWorkerThread {
        protected ResponseForkJoinWorkerThread(ForkJoinPool pool) {
            super(pool);
        }
    }
}

