/*
 * Decompiled with CFR 0.152.
 */
package org.ovirt.vdsm.jsonrpc.client.events;

import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.ovirt.vdsm.jsonrpc.client.ClientConnectionException;
import org.ovirt.vdsm.jsonrpc.client.EventDecomposer;
import org.ovirt.vdsm.jsonrpc.client.JsonRpcEvent;
import org.ovirt.vdsm.jsonrpc.client.events.EventSubscriber;
import org.ovirt.vdsm.jsonrpc.client.events.SubscriptionHolder;
import org.ovirt.vdsm.jsonrpc.client.events.SubscriptionMatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventPublisher
implements Flow.Publisher<Map<String, Object>> {
    private static final Logger log = LoggerFactory.getLogger(EventPublisher.class);
    private final ExecutorService executorService;
    private final ScheduledExecutorService scheduledExecutorService;
    private final SubscriptionMatcher matcher;
    private final EventDecomposer decomposer;
    private final int eventTimeoutInHours;

    public EventPublisher(ExecutorService executorService, int eventTimeoutInHours) {
        this.executorService = executorService;
        this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
        this.matcher = new SubscriptionMatcher();
        this.decomposer = new EventDecomposer();
        this.eventTimeoutInHours = eventTimeoutInHours;
        this.scheduleCleanupTask();
    }

    private void scheduleCleanupTask() {
        try {
            this.scheduledExecutorService.scheduleWithFixedDelay(this::cleanupOldEvents, this.eventTimeoutInHours, this.eventTimeoutInHours, TimeUnit.HOURS);
        }
        catch (Throwable t) {
            log.error("Unable to schedule cleanup task : {}", (Object)ExceptionUtils.getRootCauseMessage((Throwable)t));
            log.debug("Exception", t);
        }
    }

    public void cleanupOldEvents() {
        try {
            for (SubscriptionHolder holder : this.matcher.getAllSubscriptions()) {
                holder.purgeOldEventsIfNotConsumed(this.eventTimeoutInHours);
            }
        }
        catch (Throwable t) {
            log.error("Error purging old events from SubscriptionHolder : {}", (Object)ExceptionUtils.getRootCauseMessage((Throwable)t));
            log.debug("Exception", t);
        }
    }

    @Override
    public void subscribe(final Flow.Subscriber<? super Map<String, Object>> subscriber) {
        final SubscriptionHolder holder = new SubscriptionHolder((EventSubscriber)subscriber);
        Flow.Subscription subscription = new Flow.Subscription(){

            @Override
            public void request(long n) {
                holder.incrementCount(n);
                EventPublisher.this.process(holder);
            }

            @Override
            public void cancel() {
                EventPublisher.this.clean(holder);
                subscriber.onComplete();
            }
        };
        subscriber.onSubscribe(subscription);
        this.matcher.add(holder);
    }

    public void publish(String subscriptionId, Map<String, Object> params) throws IOException {
        this.process(JsonRpcEvent.fromMethodAndParams(subscriptionId, params));
    }

    private void process(SubscriptionHolder holder) {
        this.executorService.submit(new EventCallable(holder, this.decomposer));
    }

    private void clean(SubscriptionHolder holder) {
        this.matcher.remove(holder);
        holder.clean();
    }

    public void process(JsonRpcEvent event) {
        Set<SubscriptionHolder> holders = this.matcher.match(event);
        holders.stream().peek(holder -> holder.putEvent(event)).filter(SubscriptionHolder::canProcess).forEach(holder -> this.executorService.submit(new EventCallable((SubscriptionHolder)holder, this.decomposer)));
    }

    public int countEvents(JsonRpcEvent event) {
        Set<SubscriptionHolder> holders = this.matcher.match(event);
        return holders.stream().mapToInt(SubscriptionHolder::getNumberOfEvents).sum();
    }

    public void close() {
        this.executorService.shutdown();
    }

    class EventCallable
    implements Callable<Void> {
        private final SubscriptionHolder holder;
        private final EventDecomposer decomposer;

        public EventCallable(SubscriptionHolder holder, EventDecomposer decomposer) {
            this.holder = holder;
            this.decomposer = decomposer;
        }

        @Override
        public Void call() {
            JsonRpcEvent event;
            EventSubscriber subscriber = this.holder.getSubscriber();
            while ((event = this.holder.canProcessMore()) != null) {
                this.handleEvent(subscriber, event);
            }
            return null;
        }

        private void handleEvent(Flow.Subscriber<Map<String, Object>> subscriber, JsonRpcEvent event) {
            try {
                Map<String, Object> map = this.decomposer.decompose(event);
                if (map.containsKey("communicationError")) {
                    subscriber.onError(new ClientConnectionException((String)map.get("communicationError")));
                } else {
                    subscriber.onNext(map);
                }
            }
            catch (Throwable t) {
                log.error("Error processing event '{}' for subscriber '{}' : {}.", new Object[]{event.toString(), subscriber.getClass().getCanonicalName(), ExceptionUtils.getRootCauseMessage((Throwable)t)});
                log.debug("Exception", t);
            }
        }
    }
}

