/*
 * Decompiled with CFR 0.152.
 */
package org.granite.client.tide.data;

import java.util.ArrayList;
import org.granite.client.messaging.Consumer;
import org.granite.client.messaging.ResponseListener;
import org.granite.client.messaging.ResultFaultIssuesResponseListener;
import org.granite.client.messaging.TopicMessageListener;
import org.granite.client.messaging.channel.ResponseMessageFuture;
import org.granite.client.messaging.events.FaultEvent;
import org.granite.client.messaging.events.IssueEvent;
import org.granite.client.messaging.events.ResultEvent;
import org.granite.client.messaging.events.TopicMessageEvent;
import org.granite.client.messaging.messages.push.TopicMessage;
import org.granite.client.tide.Context;
import org.granite.client.tide.ContextAware;
import org.granite.client.tide.NameAware;
import org.granite.client.tide.data.EntityManager;
import org.granite.client.tide.data.impl.ChangeEntity;
import org.granite.client.tide.data.impl.ChangeEntityRef;
import org.granite.client.tide.data.spi.MergeContext;
import org.granite.client.tide.server.ServerSession;
import org.granite.logging.Logger;
import org.granite.tide.data.Change;
import org.granite.tide.data.ChangeRef;

public class DataObserver
implements ContextAware,
NameAware {
    private static Logger log = Logger.getLogger(DataObserver.class);
    public static final String DATA_OBSERVER_TOPIC_NAME = "tideDataTopic";
    private Context context;
    private ServerSession serverSession = null;
    private EntityManager entityManager = null;
    private String channelType = null;
    private String destination = null;
    private Consumer consumer = null;
    private boolean subscribing = false;
    private boolean unsubscribing = false;
    private ResponseListener subscriptionListener = new ResultFaultIssuesResponseListener(){

        public void onResult(ResultEvent event) {
            log.info("Destination %s subscribed sid: %s", new Object[]{DataObserver.this.destination, DataObserver.this.consumer.getSubscriptionId()});
            DataObserver.this.subscribing = false;
        }

        public void onFault(FaultEvent event) {
            log.error("Destination %s could not be subscribed: %s", new Object[]{DataObserver.this.destination, event.getCode()});
            DataObserver.this.subscribing = false;
        }

        public void onIssue(IssueEvent event) {
            log.error("Destination %s could not be subscribed: %s", new Object[]{DataObserver.this.destination, event.getType()});
            DataObserver.this.subscribing = false;
        }
    };
    private ResponseListener unsubscriptionListener = new ResultFaultIssuesResponseListener(){

        public void onResult(ResultEvent event) {
            log.info("Destination %s unsubscribed", new Object[]{DataObserver.this.destination});
            DataObserver.this.unsubscribing = false;
        }

        public void onFault(FaultEvent event) {
            log.error("Destination %s could not be unsubscribed: %s", new Object[]{DataObserver.this.destination, event.getCode()});
            DataObserver.this.unsubscribing = false;
        }

        public void onIssue(IssueEvent event) {
            log.error("Destination %s could not be unsubscribed: %s", new Object[]{DataObserver.this.destination, event.getType()});
            DataObserver.this.unsubscribing = false;
        }
    };
    private TopicMessageListener messageListener = new TopicMessageListener(){

        public void onMessage(TopicMessageEvent event) {
            log.debug("Destination %s message event received %s", new Object[]{DataObserver.this.destination, event.toString()});
            final TopicMessage message = event.getMessage();
            DataObserver.this.context.callLater(new Runnable(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void run() {
                    try {
                        String receivedSessionId = (String)message.getHeader("GDSSessionID");
                        if (receivedSessionId != null && receivedSessionId.equals(DataObserver.this.serverSession.getSessionId())) {
                            receivedSessionId = null;
                        }
                        MergeContext mergeContext = DataObserver.this.entityManager.initMerge(DataObserver.this.serverSession);
                        Object[] updates = (Object[])message.getData();
                        ArrayList<EntityManager.Update> upds = new ArrayList<EntityManager.Update>();
                        for (Object update : updates) {
                            String updateType = ((Object[])update)[0].toString().toUpperCase();
                            Object entity = ((Object[])update)[1];
                            if (EntityManager.UpdateKind.REFRESH.toString().toLowerCase().equals(updateType) && entity instanceof String) {
                                entity = DataObserver.this.serverSession.getAliasRegistry().getAliasForType((String)entity);
                            } else if (entity instanceof ChangeRef) {
                                entity = new ChangeEntityRef(entity, DataObserver.this.serverSession.getAliasRegistry());
                            } else if (entity instanceof Change) {
                                entity = new ChangeEntity((Change)entity, DataObserver.this.serverSession.getAliasRegistry());
                            }
                            upds.add(new EntityManager.Update(EntityManager.UpdateKind.forName(updateType), entity));
                        }
                        DataObserver.this.entityManager.handleUpdates(mergeContext, receivedSessionId, upds);
                        DataObserver.this.entityManager.raiseUpdateEvents(DataObserver.this.context, upds);
                    }
                    catch (Exception e) {
                        log.error((Throwable)e, "Error during received message processing", new Object[0]);
                    }
                    finally {
                        MergeContext.destroy(DataObserver.this.entityManager);
                    }
                }
            });
        }
    };

    protected DataObserver() {
    }

    public DataObserver(ServerSession serverSession) {
        this.serverSession = serverSession;
        if (serverSession.getContext() != null) {
            this.entityManager = serverSession.getContext().getEntityManager();
        }
    }

    public DataObserver(String channelType, ServerSession serverSession) {
        this.channelType = channelType;
        this.serverSession = serverSession;
        if (serverSession.getContext() != null) {
            this.entityManager = serverSession.getContext().getEntityManager();
        }
    }

    public DataObserver(ServerSession serverSession, EntityManager entityManager) {
        this.serverSession = serverSession;
        this.entityManager = entityManager;
    }

    public DataObserver(String destination, ServerSession serverSession, EntityManager entityManager) {
        this.destination = destination;
        this.serverSession = serverSession;
        this.entityManager = entityManager;
    }

    public DataObserver(String destination, String channelType, ServerSession serverSession, EntityManager entityManager) {
        this.destination = destination;
        this.channelType = channelType;
        this.serverSession = serverSession;
        this.entityManager = entityManager;
    }

    @Override
    public void setContext(Context context) {
        this.context = context;
        if (this.entityManager == null) {
            this.entityManager = context.getEntityManager();
        }
    }

    @Override
    public void setName(String name) {
        if (this.destination == null) {
            this.destination = name;
        }
    }

    public void start() {
        this.consumer = this.serverSession.getConsumer(this.destination, DATA_OBSERVER_TOPIC_NAME, this.channelType);
        this.unsubscribing = false;
    }

    public void stop() {
        if (this.consumer != null && this.consumer.isSubscribed()) {
            this.unsubscribe(true);
        }
        this.consumer = null;
    }

    public synchronized void subscribe() {
        if (this.consumer == null) {
            throw new IllegalStateException("Cannot subscribe, DataObserver " + this.destination + " not started");
        }
        if (this.subscribing) {
            return;
        }
        this.subscribing = true;
        this.consumer.addMessageListener(this.messageListener);
        this.consumer.subscribe(new ResponseListener[]{this.subscriptionListener});
    }

    public void unsubscribe() {
        this.unsubscribe(false);
    }

    public synchronized void unsubscribe(boolean onStop) {
        if (this.consumer == null) {
            throw new IllegalStateException("Cannot unsubscribe, DataObserver " + this.destination + " not started");
        }
        if (!this.consumer.isSubscribed() || this.unsubscribing) {
            return;
        }
        this.unsubscribing = true;
        this.consumer.removeMessageListener(this.messageListener);
        if (!onStop) {
            this.consumer.unsubscribe(new ResponseListener[]{this.unsubscriptionListener});
        } else {
            ResponseMessageFuture future = this.consumer.unsubscribe(new ResponseListener[]{this.unsubscriptionListener});
            try {
                future.get(2500L);
            }
            catch (Exception e) {
                log.error((Throwable)e, "Destination %s could not be unsubscribed on stop: %s", new Object[]{this.destination});
            }
        }
    }
}

