package org.mule.runtime.core.routing;

import java.io.Serializable;
import java.util.concurrent.TimeUnit;
import org.mule.runtime.api.exception.ExceptionHelper;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.LifecycleException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.store.ObjectStoreException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.context.notification.MuleContextNotificationListener;
import org.mule.runtime.core.api.exception.MessagingExceptionHandler;
import org.mule.runtime.core.api.exception.MessagingExceptionHandlerAware;
import org.mule.runtime.core.api.util.StringUtils;
import org.mule.runtime.core.context.notification.MuleContextNotification;
import org.mule.runtime.core.context.notification.NotificationException;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.internal.message.DefaultExceptionPayload;
import org.mule.runtime.core.internal.message.InternalMessage;
import org.mule.runtime.core.message.ErrorBuilder;
import org.mule.runtime.core.retry.RetryPolicyExhaustedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/mule/runtime/core/routing/AsynchronousUntilSuccessfulProcessingStrategy.class */
public class AsynchronousUntilSuccessfulProcessingStrategy extends AbstractUntilSuccessfulProcessingStrategy implements Initialisable, Disposable, Startable, Stoppable, MessagingExceptionHandlerAware {
    private static final String UNTIL_SUCCESSFUL_MSG_PREFIX = "until-successful retries exhausted. Last exception message was: %s";
    protected transient Logger logger = LoggerFactory.getLogger(getClass());
    private MessagingExceptionHandler messagingExceptionHandler;
    private Scheduler pool;
    private MuleContextNotificationListener<MuleContextNotification> contextStartListener;

    public void initialise() throws InitialisationException {
        if (getUntilSuccessfulConfiguration().getObjectStore() == null) {
            throw new InitialisationException(I18nMessageFactory.createStaticMessage("A ListableObjectStore must be configured on UntilSuccessful."), this);
        }
        this.contextStartListener = new MuleContextNotificationListener<MuleContextNotification>() { // from class: org.mule.runtime.core.routing.AsynchronousUntilSuccessfulProcessingStrategy.1
            @Override // org.mule.runtime.core.api.context.notification.ServerNotificationListener
            public void onNotification(MuleContextNotification muleContextNotification) {
                if (muleContextNotification.getAction() == 104) {
                    AsynchronousUntilSuccessfulProcessingStrategy.this.muleContext.unregisterListener(this);
                    AsynchronousUntilSuccessfulProcessingStrategy.this.contextStartListener = null;
                    AsynchronousUntilSuccessfulProcessingStrategy.this.scheduleAllPendingEventsForProcessing();
                }
            }
        };
        try {
            this.muleContext.registerListener(this.contextStartListener);
        } catch (NotificationException e) {
            throw new InitialisationException(e, this);
        }
    }

    public void start() {
        this.pool = this.muleContext.getSchedulerService().ioScheduler(this.muleContext.getSchedulerBaseConfig().withName(String.format("%s.%s", getUntilSuccessfulConfiguration().getFlowConstruct().getName(), "until-successful")));
    }

    public void stop() {
        this.pool.stop();
        this.pool = null;
    }

    @Override // org.mule.runtime.core.routing.AbstractUntilSuccessfulProcessingStrategy
    protected Event doRoute(Event event, FlowConstruct flowConstruct) throws MuleException {
        try {
            scheduleForProcessing(storeEvent(event, flowConstruct), true);
            return getUntilSuccessfulConfiguration().getAckExpression() == null ? event : processResponseThroughAckResponseExpression(event);
        } catch (Exception e) {
            throw new MessagingException(I18nMessageFactory.createStaticMessage("Failed to schedule the event for processing"), event, e, getUntilSuccessfulConfiguration().getRouter());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleAllPendingEventsForProcessing() {
        try {
            for (Serializable serializable : getUntilSuccessfulConfiguration().getObjectStore().allKeys()) {
                try {
                    scheduleForProcessing(serializable, true);
                } catch (Exception e) {
                    this.logger.error(I18nMessageFactory.createStaticMessage("Failed to schedule for processing event stored with key: " + serializable).toString(), e);
                }
            }
        } catch (Exception e2) {
            this.logger.warn("Failure during scheduling of until successful previous jobs " + e2.getMessage());
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Failure during scheduling of until successful previous jobs ", e2);
            }
        }
    }

    private void scheduleForProcessing(Serializable serializable, boolean z) {
        if (z) {
            submitForProcessing(serializable);
        } else {
            this.pool.schedule(() -> {
                doProcess(serializable);
            }, getUntilSuccessfulConfiguration().getMillisBetweenRetries(), TimeUnit.MILLISECONDS);
        }
    }

    protected void submitForProcessing(Serializable serializable) {
        this.pool.execute(() -> {
            doProcess(serializable);
        });
    }

    protected void doProcess(Serializable serializable) {
        try {
            retrieveAndProcessEvent(serializable);
        } catch (ObjectStoreException e) {
            throw new MuleRuntimeException(e);
        } catch (Exception e2) {
            incrementProcessAttemptCountAndRescheduleOrRemoveFromStore(serializable, e2);
        }
    }

    private void incrementProcessAttemptCountAndRescheduleOrRemoveFromStore(Serializable serializable, Exception exc) {
        try {
            Event event = (Event) getUntilSuccessfulConfiguration().getObjectStore().remove(serializable);
            Integer num = (Integer) Event.getVariableValueOrNull(UntilSuccessful.PROCESS_ATTEMPT_COUNT_PROPERTY_NAME, event);
            Integer valueOf = Integer.valueOf(num != null ? num.intValue() : 1);
            if (valueOf.intValue() <= getUntilSuccessfulConfiguration().getMaxRetries()) {
                getUntilSuccessfulConfiguration().getObjectStore().store(serializable, Event.builder(event).addVariable(UntilSuccessful.PROCESS_ATTEMPT_COUNT_PROPERTY_NAME, Integer.valueOf(valueOf.intValue() + 1)).build());
                scheduleForProcessing(serializable, false);
            } else {
                abandonRetries(event, event, exc);
            }
        } catch (ObjectStoreException e) {
            this.logger.error("Failed to increment failure count for event stored with key: " + serializable, e);
        }
    }

    private Serializable storeEvent(Event event, FlowConstruct flowConstruct) throws ObjectStoreException {
        Integer num = (Integer) Event.getVariableValueOrNull(UntilSuccessful.PROCESS_ATTEMPT_COUNT_PROPERTY_NAME, event);
        return storeEvent(event, flowConstruct, Integer.valueOf(num != null ? num.intValue() : 1).intValue());
    }

    private Serializable storeEvent(Event event, FlowConstruct flowConstruct, int i) throws ObjectStoreException {
        Event build = Event.builder(event).addVariable(UntilSuccessful.PROCESS_ATTEMPT_COUNT_PROPERTY_NAME, Integer.valueOf(i)).build();
        Serializable buildQueueKey = buildQueueKey(build, flowConstruct, this.muleContext);
        getUntilSuccessfulConfiguration().getObjectStore().store(buildQueueKey, build);
        return buildQueueKey;
    }

    public static Serializable buildQueueKey(Event event, FlowConstruct flowConstruct, MuleContext muleContext) {
        StringBuilder sb = new StringBuilder();
        event.getGroupCorrelation().getSequence().ifPresent(num -> {
            sb.append(num + StringUtils.DASH);
        });
        sb.append(event.getContext().getId());
        sb.append(StringUtils.DASH);
        sb.append(muleContext.getClusterId());
        sb.append(StringUtils.DASH);
        sb.append(flowConstruct);
        return sb.toString();
    }

    private void abandonRetries(Event event, Event event2, Exception exc) {
        if (getUntilSuccessfulConfiguration().getDlqMP() == null) {
            this.logger.info("Retry attempts exhausted and no DLQ defined");
            this.messagingExceptionHandler.handleException(new MessagingException(event2, (Throwable) buildRetryPolicyExhaustedException(exc)), event2);
            return;
        }
        this.logger.info("Retry attempts exhausted, routing message to DLQ: " + getUntilSuccessfulConfiguration().getDlqMP());
        try {
            LifecycleException buildRetryPolicyExhaustedException = buildRetryPolicyExhaustedException(exc);
            getUntilSuccessfulConfiguration().getDlqMP().process(Event.builder(event2).message(InternalMessage.builder(event2.getMessage()).exceptionPayload(new DefaultExceptionPayload(buildRetryPolicyExhaustedException)).mo105build()).error(ErrorBuilder.builder(buildRetryPolicyExhaustedException).errorType(this.muleContext.getErrorTypeLocator().lookupErrorType((Throwable) buildRetryPolicyExhaustedException)).build()).build());
        } catch (Exception e) {
            this.messagingExceptionHandler.handleException(new MessagingException(event, e), event);
        } catch (MessagingException e2) {
            this.messagingExceptionHandler.handleException(e2, event);
        }
    }

    protected RetryPolicyExhaustedException buildRetryPolicyExhaustedException(Exception exc) {
        MuleException rootMuleException = ExceptionHelper.getRootMuleException(exc);
        if (rootMuleException == null) {
            return new RetryPolicyExhaustedException(I18nMessageFactory.createStaticMessage(UNTIL_SUCCESSFUL_MSG_PREFIX, new Object[]{exc.getMessage()}), exc, this);
        }
        if (rootMuleException.getCause() != null) {
            RetryPolicyExhaustedException retryPolicyExhaustedException = new RetryPolicyExhaustedException(I18nMessageFactory.createStaticMessage(UNTIL_SUCCESSFUL_MSG_PREFIX, new Object[]{rootMuleException.getMessage()}), rootMuleException.getCause());
            retryPolicyExhaustedException.getInfo().putAll(rootMuleException.getInfo());
            return retryPolicyExhaustedException;
        }
        RetryPolicyExhaustedException retryPolicyExhaustedException2 = new RetryPolicyExhaustedException(I18nMessageFactory.createStaticMessage(UNTIL_SUCCESSFUL_MSG_PREFIX, new Object[]{rootMuleException.getMessage()}), rootMuleException);
        retryPolicyExhaustedException2.getInfo().putAll(rootMuleException.getInfo());
        return retryPolicyExhaustedException2;
    }

    private void removeFromStore(Serializable serializable) {
        try {
            getUntilSuccessfulConfiguration().getObjectStore().remove(serializable);
        } catch (ObjectStoreException e) {
            this.logger.warn("Failed to remove following event from store with key: " + serializable);
        }
    }

    private void retrieveAndProcessEvent(Serializable serializable) throws ObjectStoreException {
        processEvent((Event) getUntilSuccessfulConfiguration().getObjectStore().retrieve(serializable));
        removeFromStore(serializable);
    }

    @Override // org.mule.runtime.core.api.exception.MessagingExceptionHandlerAware
    public void setMessagingExceptionHandler(MessagingExceptionHandler messagingExceptionHandler) {
        this.messagingExceptionHandler = messagingExceptionHandler;
    }

    public void dispose() {
        if (this.contextStartListener != null) {
            this.muleContext.unregisterListener(this.contextStartListener);
        }
    }
}
