package org.mule.runtime.core.internal.routing;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.Predicate;
import javax.inject.Inject;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.component.location.Location;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.construct.Pipeline;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.processor.AbstractMuleObjectOwner;
import org.mule.runtime.core.api.processor.MessageProcessorChain;
import org.mule.runtime.core.api.processor.MessageProcessors;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.Router;
import org.mule.runtime.core.api.retry.policy.RetryPolicyExhaustedException;
import org.mule.runtime.core.api.retry.policy.SimpleRetryPolicyTemplate;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/mule/runtime/core/internal/routing/UntilSuccessful.class */
public class UntilSuccessful extends AbstractMuleObjectOwner implements Router {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) UntilSuccessful.class);
    private static final String UNTIL_SUCCESSFUL_MSG_PREFIX = "'until-successful' retries exhausted. Last exception message was: %s";
    private static final long DEFAULT_MILLIS_BETWEEN_RETRIES = 60000;
    private static final int DEFAULT_RETRIES = 5;

    @Inject
    private ConfigurationComponentLocator componentLocator;
    private int maxRetries = 5;
    private Long millisBetweenRetries = 60000L;
    private MessageProcessorChain nestedChain;
    private Predicate<InternalEvent> shouldRetry;
    private SimpleRetryPolicyTemplate policyTemplate;
    private Scheduler timer;
    private FlowConstruct flowConstruct;
    private List<Processor> processors;

    @Override // org.mule.runtime.core.api.processor.AbstractMuleObjectOwner, org.mule.runtime.api.lifecycle.Initialisable
    public void initialise() throws InitialisationException {
        if (this.processors == null) {
            throw new InitialisationException(I18nMessageFactory.createStaticMessage("One message processor must be configured within 'until-successful'."), this);
        }
        this.nestedChain = MessageProcessors.newChain(MessageProcessors.getProcessingStrategy(this.muleContext, getRootContainerName()), this.processors);
        super.initialise();
        this.timer = this.muleContext.getSchedulerService().cpuLightScheduler();
        this.policyTemplate = new SimpleRetryPolicyTemplate(this.millisBetweenRetries.longValue(), this.maxRetries, this.timer);
        this.shouldRetry = internalEvent -> {
            return internalEvent.getError().isPresent();
        };
        this.flowConstruct = (FlowConstruct) this.componentLocator.find(Location.builder().globalName(getRootContainerName()).build()).get();
    }

    @Override // org.mule.runtime.core.api.processor.AbstractMuleObjectOwner, org.mule.runtime.api.lifecycle.Disposable
    public void dispose() {
        super.dispose();
        this.timer.stop();
    }

    @Override // org.mule.runtime.core.api.processor.Processor
    public InternalEvent process(InternalEvent internalEvent) throws MuleException {
        return MessageProcessors.processToApply(internalEvent, this);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.mule.runtime.core.api.processor.Processor, java.util.function.Function
    public Publisher<InternalEvent> apply(Publisher<InternalEvent> publisher) {
        return Flux.from(publisher).flatMap(internalEvent -> {
            return Mono.from(MessageProcessors.processWithChildContext(internalEvent, scheduleRoute(publisher2 -> {
                return Mono.from(publisher2).transform(this.nestedChain);
            }), Optional.ofNullable(getLocation()))).transform(mono -> {
                return this.policyTemplate.applyPolicy(mono, getRetryPredicate(), th -> {
                }, getThrowableFunction(internalEvent));
            });
        });
    }

    private Predicate<Throwable> getRetryPredicate() {
        return th -> {
            return (th instanceof MessagingException) && this.shouldRetry.test(((MessagingException) th).getEvent());
        };
    }

    private Function<Throwable, Throwable> getThrowableFunction(InternalEvent internalEvent) {
        return th -> {
            Throwable messagingExceptionCause = ExceptionUtils.getMessagingExceptionCause(th);
            return new MessagingException(internalEvent, new RetryPolicyExhaustedException(I18nMessageFactory.createStaticMessage(UNTIL_SUCCESSFUL_MSG_PREFIX, messagingExceptionCause.getMessage()), messagingExceptionCause, this), this);
        };
    }

    private ReactiveProcessor scheduleRoute(ReactiveProcessor reactiveProcessor) {
        return this.flowConstruct instanceof Pipeline ? publisher -> {
            return Flux.from(publisher).transform(((Pipeline) this.flowConstruct).getProcessingStrategy().onPipeline(reactiveProcessor));
        } : publisher2 -> {
            return publisher2;
        };
    }

    public int getMaxRetries() {
        return this.maxRetries;
    }

    public void setMaxRetries(int i) {
        this.maxRetries = i;
    }

    public long getMillisBetweenRetries() {
        return this.millisBetweenRetries.longValue();
    }

    public void setMillisBetweenRetries(long j) {
        this.millisBetweenRetries = Long.valueOf(j);
    }

    public void setMessageProcessors(List<Processor> list) {
        this.processors = list;
    }

    @Override // org.mule.runtime.core.api.processor.AbstractMuleObjectOwner
    protected List<Object> getOwnedObjects() {
        return Collections.singletonList(this.nestedChain);
    }
}
