/*
 * Decompiled with CFR 0.152.
 */
package org.ikasan.framework.initiator.eventdriven;

import org.apache.log4j.Logger;
import org.ikasan.framework.component.Event;
import org.ikasan.framework.component.IkasanExceptionHandler;
import org.ikasan.framework.exception.IkasanExceptionAction;
import org.ikasan.framework.flow.Flow;
import org.ikasan.framework.initiator.AbortTransactionException;
import org.ikasan.framework.initiator.AbstractInitiator;
import org.ikasan.framework.initiator.eventdriven.EventDrivenInitiator;
import org.ikasan.framework.initiator.eventdriven.MessageEndpointManager;
import org.ikasan.framework.monitor.MonitorSubject;

public class EventDrivenInitiatorImpl
extends AbstractInitiator
implements EventDrivenInitiator,
MonitorSubject {
    private static final String INITIATOR_STOPPING = "Initiator cannot process message whilst managing a stop request.";
    private static final String INITIATOR_ANESTHETIST_OPERATING = "Initiator cannot process message until anesthetist has completed.";
    public static final String EVENT_DRIVEN_INITIATOR_TYPE = "EventDrivenInitiator";
    static Logger logger = Logger.getLogger(EventDrivenInitiatorImpl.class);
    protected MessageEndpointManager messageEndpointManager;
    protected Anesthetist anesthetist = null;
    protected Halt halt = null;

    public EventDrivenInitiatorImpl(String moduleName, String name, Flow flow, IkasanExceptionHandler exceptionHandler) {
        super(moduleName, name, flow, exceptionHandler);
        if (moduleName == null) {
            throw new IllegalArgumentException("moduleName cannot be 'null'");
        }
        if (name == null) {
            throw new IllegalArgumentException("Initiator name cannot be 'null'");
        }
        if (flow == null) {
            throw new IllegalArgumentException("flow cannot be 'null'");
        }
        if (exceptionHandler == null) {
            throw new IllegalArgumentException("exceptionHandler cannot be 'null'");
        }
    }

    @Override
    public String getType() {
        return EVENT_DRIVEN_INITIATOR_TYPE;
    }

    @Override
    public void onException(Throwable throwable) {
        if (this.stopping) {
            throw new AbortTransactionException(INITIATOR_STOPPING);
        }
        if (this.anesthetistOperating()) {
            throw new AbortTransactionException(INITIATOR_ANESTHETIST_OPERATING);
        }
        IkasanExceptionAction action = this.exceptionHandler.handleThrowable(this.name, throwable);
        this.logError(null, throwable, this.name, action);
        this.handleAction(action, null);
    }

    @Override
    public void onEvent(Event event) {
        if (this.stopping) {
            throw new AbortTransactionException(INITIATOR_STOPPING);
        }
        if (this.anesthetistOperating()) {
            throw new AbortTransactionException(INITIATOR_ANESTHETIST_OPERATING);
        }
        this.invokeFlow(event);
    }

    @Override
    protected void completeRetryCycle() {
        if (this.retryCount != null) {
            this.retryCount = null;
        }
    }

    @Override
    protected void startRetryCycle(Integer maxAttempts, long delay) {
        this.anesthetist = new Anesthetist(delay);
        this.anesthetist.start();
    }

    @Override
    protected void continueRetryCycle(long delay) {
        this.anesthetist = new Anesthetist(delay);
        this.anesthetist.start();
    }

    @Override
    protected void cancelRetryCycle() {
        if (this.anesthetist != null) {
            this.anesthetist.cancel();
            this.anesthetist = null;
        }
        this.retryCount = null;
    }

    @Override
    public boolean isRecovering() {
        return this.retryCount != null;
    }

    @Override
    public boolean isRunning() {
        if (this.halt != null) {
            return false;
        }
        if (this.anesthetistOperating()) {
            return true;
        }
        return this.messageEndpointManager.isRunning();
    }

    protected boolean anesthetistOperating() {
        return this.anesthetist != null && this.anesthetist.isOperating();
    }

    @Override
    protected void startInitiator() {
        this.halt = null;
        this.messageEndpointManager.start();
    }

    @Override
    protected void stopInitiator() {
        this.messageEndpointManager.stop();
    }

    @Override
    protected void stopInError() {
        this.error = true;
        this.stopping = true;
        if (this.isRecovering()) {
            this.cancelRetryCycle();
        }
        this.halt = new Halt();
        this.halt.start();
        this.notifyMonitorListeners();
    }

    public void setMessageEndpointManager(MessageEndpointManager messageEndpointManager) {
        this.messageEndpointManager = messageEndpointManager;
    }

    @Override
    public MessageEndpointManager getMessageEndpointManager() {
        return this.messageEndpointManager;
    }

    @Override
    protected Logger getLogger() {
        return logger;
    }

    private class Halt
    extends Thread {
        private Halt() {
        }

        @Override
        public void run() {
            logger.info((Object)"stopping messageEndpointManager...");
            EventDrivenInitiatorImpl.this.messageEndpointManager.stop();
            logger.info((Object)"stopped messageEndpointManager successfully.");
        }
    }

    private class Anesthetist
    extends Thread {
        long sleepPeriod;
        boolean operating = false;
        boolean cancelled = false;

        public Anesthetist(long sleepPeriod) {
            this.sleepPeriod = sleepPeriod;
            logger.info((Object)("Created anesthetist with a sleep time of " + sleepPeriod + "ms"));
        }

        @Override
        public void run() {
            try {
                logger.info((Object)"Anesthetist invoked");
                this.putToSleep();
                logger.info((Object)("Anesthetist sleeping for [" + this.sleepPeriod + "]ms."));
                Anesthetist.sleep(this.sleepPeriod);
                logger.info((Object)"Anesthetist woken from sleep.");
                this.reawaken();
            }
            catch (InterruptedException e) {
                logger.info((Object)"Anesthetist sleep interrupted", (Throwable)e);
                this.reawaken();
            }
        }

        private void putToSleep() {
            this.operating = true;
            logger.info((Object)"Anesthetist invoking the messageListenerConatiner stop...");
            EventDrivenInitiatorImpl.this.messageEndpointManager.stop();
            logger.info((Object)"Anesthetist invoked the messageListenerConatiner stop successfully.");
        }

        private void reawaken() {
            if (!this.cancelled) {
                logger.info((Object)"Anesthetist restarting messageEndpointManager...");
                EventDrivenInitiatorImpl.this.messageEndpointManager.start();
                logger.info((Object)"Anesthetist restarted messageEndpointManager successfully.");
            }
            this.operating = false;
        }

        public boolean isOperating() {
            return this.operating;
        }

        public void cancel() {
            logger.info((Object)"cancelling any anesthetist operation");
            this.cancelled = true;
        }
    }
}

