/*
 * Decompiled with CFR 0.152.
 */
package org.fisco.bcos.sdk.eventsub;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.fisco.bcos.sdk.channel.Channel;
import org.fisco.bcos.sdk.channel.ResponseCallback;
import org.fisco.bcos.sdk.eventsub.EventCallback;
import org.fisco.bcos.sdk.eventsub.EventLogParams;
import org.fisco.bcos.sdk.eventsub.EventMsg;
import org.fisco.bcos.sdk.eventsub.EventResource;
import org.fisco.bcos.sdk.eventsub.EventSubscribe;
import org.fisco.bcos.sdk.eventsub.filter.EventLogFilter;
import org.fisco.bcos.sdk.eventsub.filter.EventLogFilterStatus;
import org.fisco.bcos.sdk.eventsub.filter.EventLogResponse;
import org.fisco.bcos.sdk.eventsub.filter.EventPushMsgHandler;
import org.fisco.bcos.sdk.eventsub.filter.EventSubNodeRespStatus;
import org.fisco.bcos.sdk.eventsub.filter.FilterManager;
import org.fisco.bcos.sdk.eventsub.filter.ScheduleTimeConfig;
import org.fisco.bcos.sdk.model.Message;
import org.fisco.bcos.sdk.model.MsgType;
import org.fisco.bcos.sdk.model.Response;
import org.fisco.bcos.sdk.service.GroupManagerService;
import org.fisco.bcos.sdk.utils.ObjectMapperFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EventSubscribeImp
implements EventSubscribe {
    private static final Logger logger = LoggerFactory.getLogger(EventSubscribeImp.class);
    private Channel channel;
    private GroupManagerService groupManagerService;
    private Integer groupId;
    private FilterManager filterManager;
    private EventPushMsgHandler msgHander;
    private EventResource eventResource;
    private boolean running = false;
    ScheduledThreadPoolExecutor resendSchedule = new ScheduledThreadPoolExecutor(1);

    public EventSubscribeImp(GroupManagerService groupManagerService, EventResource eventResource, Integer groupId) {
        this.channel = groupManagerService.getChannel();
        this.groupManagerService = groupManagerService;
        this.groupId = groupId;
        this.eventResource = eventResource;
        this.filterManager = eventResource.getFilterManager();
        this.msgHander = eventResource.getMsgHander();
        this.channel.addMessageHandler(MsgType.EVENT_LOG_PUSH, this.msgHander);
        this.channel.addDisconnectHandler(this.msgHander);
    }

    @Override
    public EventResource getEventResource() {
        return this.eventResource;
    }

    @Override
    public String subscribeEvent(EventLogParams params, EventCallback callback) {
        BigInteger number = this.groupManagerService.getLatestBlockNumberByGroup(this.groupId);
        logger.info(" subscribe event at block num:" + number);
        if (!params.checkParams(number)) {
            callback.onReceiveLog(EventSubNodeRespStatus.INVALID_PARAMS.getStatus(), null);
            return null;
        }
        EventLogFilter filter = new EventLogFilter();
        filter.setRegisterID(EventSubscribe.newSeq());
        filter.setParams(params);
        filter.setCallback(callback);
        this.filterManager.addFilter(filter);
        this.sendFilter(filter);
        logger.info(" subscribe event, registerID: {}, filterID : {}", (Object)filter.getRegisterID(), (Object)filter.getFilterID());
        return filter.getRegisterID();
    }

    @Override
    public void unsubscribeEvent(String registerID, EventCallback callback) {
        EventLogFilter filter = this.filterManager.getFilter(registerID);
        if (filter == null) {
            logger.info(" try to unsubscribe an nonexistent event");
            return;
        }
        filter.setCallback(callback);
        this.filterManager.addCallback(filter.getFilterID(), callback);
        Message msg = new Message();
        msg.setSeq(EventSubscribe.newSeq());
        msg.setType((short)MsgType.CLIENT_UNREGISTER_EVENT_LOG.getType());
        msg.setResult(0);
        try {
            String content = filter.getParamJsonString(String.valueOf(this.groupId), filter.getFilterID());
            logger.info(" unsubscribe event, registerID: {}, filterID : {}", (Object)filter.getRegisterID(), (Object)filter.getFilterID());
            msg.setData(content.getBytes());
        }
        catch (JsonProcessingException e) {
            logger.error(" unsubscribe event error: {}", (Object)e.getMessage());
        }
        EventMsg eventMsg = new EventMsg(msg);
        eventMsg.setTopic("");
        eventMsg.setData(msg.getData());
        this.groupManagerService.asyncSendMessageToGroup(this.groupId, eventMsg, new UnRegisterEventSubRespCallback(this.filterManager, filter));
    }

    @Override
    public List<EventLogFilter> getAllSubscribedEvent() {
        return this.filterManager.getAllSubscribedEvent();
    }

    @Override
    public void start() {
        if (this.running) {
            return;
        }
        this.running = true;
        this.resendSchedule.scheduleAtFixedRate(() -> this.resendWaitingFilters(), 0L, ScheduleTimeConfig.resendFrequency, TimeUnit.MILLISECONDS);
    }

    @Override
    public void stop() {
        if (!this.running) {
            return;
        }
        this.running = false;
        this.resendSchedule.shutdown();
    }

    private void resendWaitingFilters() {
        List<EventLogFilter> filters = this.filterManager.getWaitingReqFilters();
        try {
            for (EventLogFilter filter : filters) {
                this.sendFilter(filter);
            }
            logger.info("Resend waiting filters, size: {}", (Object)filters.size());
        }
        catch (Exception e) {
            logger.error("resendWaitingFilters exception : {}", (Object)e.getMessage());
            for (EventLogFilter filter : filters) {
                filter.setStatus(EventLogFilterStatus.WAITING_REQUEST);
            }
        }
    }

    private void sendFilter(EventLogFilter filter) {
        Message msg = new Message();
        msg.setSeq(EventSubscribe.newSeq());
        msg.setType((short)MsgType.CLIENT_REGISTER_EVENT_LOG.getType());
        msg.setResult(0);
        try {
            String content = filter.getNewParamJsonString(String.valueOf(this.groupId));
            msg.setData(content.getBytes());
        }
        catch (JsonProcessingException e) {
            logger.error("send filter error and remove bad filter, registerID: {},filterID : {}, error: {}", new Object[]{filter.getRegisterID(), filter.getFilterID(), e.getMessage()});
            this.filterManager.removeFilter(filter.getRegisterID());
        }
        this.filterManager.addCallback(filter.getFilterID(), filter.getCallback());
        EventMsg eventMsg = new EventMsg(msg);
        eventMsg.setTopic("");
        eventMsg.setData(msg.getData());
        this.groupManagerService.asyncSendMessageToGroup(this.groupId, eventMsg, new RegisterEventSubRespCallback(this.filterManager, filter, filter.getFilterID(), filter.getRegisterID()));
    }

    class UnRegisterEventSubRespCallback
    extends ResponseCallback {
        FilterManager filterManager;
        EventLogFilter filter;

        public UnRegisterEventSubRespCallback(FilterManager filterManager, EventLogFilter filter) {
            this.filterManager = filterManager;
            this.filter = filter;
        }

        @Override
        public void onResponse(Response response) {
            String registerId = this.filter.getRegisterID();
            logger.info(" unregister event callback response, registerID: {}, seq: {}, error code: {}, content: {}", new Object[]{registerId, response.getMessageID(), response.getErrorCode(), response.getContent()});
            try {
                if (0 == response.getErrorCode()) {
                    EventLogResponse resp = (EventLogResponse)ObjectMapperFactory.getObjectMapper().readValue(response.getContent().trim(), EventLogResponse.class);
                    if (resp.getResult() == 0) {
                        logger.info(" unregister event success");
                        this.filterManager.removeFilter(this.filter.getRegisterID());
                        this.filterManager.removeCallback(this.filter.getFilterID());
                    } else {
                        logger.warn(" unregister event fail");
                    }
                    this.filter.getCallback().onReceiveLog(resp.getResult(), null);
                }
            }
            catch (Exception e) {
                logger.error(" unregister event response message exception, registerID: {}, exception message: {}", (Object)registerId, (Object)e.getMessage());
                this.filter.getCallback().onReceiveLog(EventSubNodeRespStatus.OTHER_ERROR.getStatus(), null);
            }
        }
    }

    class RegisterEventSubRespCallback
    extends ResponseCallback {
        FilterManager filterManager;
        EventLogFilter filter;
        String filterID;
        String registerID;

        public RegisterEventSubRespCallback(FilterManager filterManager, EventLogFilter filter, String filterID, String registerID) {
            this.filterManager = filterManager;
            this.filter = filter;
            this.filterID = filterID;
            this.registerID = registerID;
        }

        @Override
        public void onResponse(Response response) {
            logger.info(" event filter callback response, registerID: {}, filterID: {}, seq: {}, error code: {},  content: {}", new Object[]{this.registerID, this.filterID, response.getMessageID(), response.getErrorCode(), response.getContent()});
            try {
                if (0 == response.getErrorCode()) {
                    EventLogResponse resp = (EventLogResponse)ObjectMapperFactory.getObjectMapper().readValue(response.getContent().trim(), EventLogResponse.class);
                    if (resp.getResult() == 0) {
                        this.filterManager.updateFilterStatus(this.filter, EventLogFilterStatus.EVENT_LOG_PUSHING, response.getCtx());
                        logger.info(" filter {} status changed to EVENT_LOG_PUSHING", (Object)this.filter.getFilterID());
                    } else {
                        this.filterManager.removeFilter(this.registerID);
                        this.filterManager.removeCallback(this.filterID);
                    }
                    this.filter.getCallback().onReceiveLog(resp.getResult(), null);
                } else {
                    this.filterManager.updateFilterStatus(this.filter, EventLogFilterStatus.WAITING_REQUEST, null);
                    this.filterManager.removeCallback(this.filterID);
                }
            }
            catch (Exception e) {
                logger.error(" event filter response message exception, filterID: {}, registerID: {}, exception message: {}", new Object[]{this.filterID, this.registerID, e.getMessage()});
                this.filter.getCallback().onReceiveLog(EventSubNodeRespStatus.OTHER_ERROR.getStatus(), null);
                this.filterManager.removeFilter(this.registerID);
                this.filterManager.removeCallback(this.filterID);
            }
        }
    }
}

