package org.mule.runtime.core.internal.source.scheduler;

import java.util.concurrent.ScheduledFuture;
import org.mule.runtime.api.component.AbstractComponent;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.CreateException;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.notification.ConnectorMessageNotification;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.source.SchedulerConfiguration;
import org.mule.runtime.api.source.SchedulerMessageSource;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.config.i18n.CoreMessages;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.context.MuleContextAware;
import org.mule.runtime.core.api.context.notification.NotificationHelper;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.event.EventContextFactory;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.source.scheduler.PeriodicScheduler;
import org.mule.runtime.core.api.util.ClassUtils;
import org.mule.runtime.core.internal.component.ComponentUtils;
import org.mule.runtime.core.internal.exception.MessagingException;
import org.mule.runtime.core.internal.message.InternalEvent;
import org.mule.runtime.core.internal.util.rx.Operators;
import org.mule.runtime.core.privileged.event.BaseEventContext;
import org.mule.runtime.core.privileged.event.PrivilegedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/mule/runtime/core/internal/source/scheduler/DefaultSchedulerMessageSource.class */
public class DefaultSchedulerMessageSource extends AbstractComponent implements MessageSource, SchedulerMessageSource, MuleContextAware, Initialisable, Disposable {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) DefaultSchedulerMessageSource.class);
    private final PeriodicScheduler scheduler;
    private final NotificationHelper notificationHelper;
    private final boolean disallowConcurrentExecution;
    private Scheduler pollingExecutor;
    private ScheduledFuture<?> schedulingJob;
    private Processor listener;
    private FlowConstruct flowConstruct;
    private MuleContext muleContext;
    private boolean started;
    private volatile boolean executing = false;

    public DefaultSchedulerMessageSource(MuleContext muleContext, PeriodicScheduler periodicScheduler, boolean z) {
        this.muleContext = muleContext;
        this.scheduler = periodicScheduler;
        this.disallowConcurrentExecution = z;
        this.notificationHelper = new NotificationHelper(muleContext.getNotificationManager(), ConnectorMessageNotification.class, false);
    }

    @Override // org.mule.runtime.api.lifecycle.Startable
    public synchronized void start() throws MuleException {
        if (this.started) {
            return;
        }
        try {
            this.schedulingJob = (ScheduledFuture) ClassUtils.withContextClassLoader(this.muleContext.getExecutionClassLoader(), () -> {
                return this.scheduler.schedule(this.pollingExecutor, () -> {
                    run();
                });
            });
            this.started = true;
        } catch (Exception e) {
            stop();
            throw new CreateException(CoreMessages.failedToScheduleWork(), e, this);
        }
    }

    @Override // org.mule.runtime.api.lifecycle.Stoppable
    public synchronized void stop() throws MuleException {
        if (this.started) {
            if (this.schedulingJob != null) {
                this.schedulingJob.cancel(false);
                this.schedulingJob = null;
            }
            this.started = false;
        }
    }

    @Override // org.mule.runtime.api.source.SchedulerMessageSource
    public void trigger() {
        this.pollingExecutor.execute(() -> {
            ClassUtils.withContextClassLoader(this.muleContext.getExecutionClassLoader(), () -> {
                poll();
            });
        });
    }

    @Override // org.mule.runtime.api.source.SchedulerMessageSource
    public boolean isStarted() {
        return this.started;
    }

    @Override // org.mule.runtime.api.source.SchedulerMessageSource
    public SchedulerConfiguration getConfiguration() {
        return this.scheduler;
    }

    private final void run() {
        PrivilegedEvent.setCurrentEvent(null);
        if (this.muleContext.isPrimaryPollingInstance()) {
            poll();
        }
    }

    private void poll() {
        boolean z;
        synchronized (this) {
            if (this.disallowConcurrentExecution && this.executing) {
                z = false;
            } else {
                z = true;
                this.executing = true;
            }
        }
        if (z) {
            pollWith(Message.of(null));
        } else {
            LOGGER.info("Flow '{}' is already running and 'disallowConcurrentExecution' is set to 'true'. Execution skipped.", this.flowConstruct.getRootContainerLocation().getGlobalName());
        }
    }

    private void pollWith(Message message) {
        try {
            Mono.just(message).map(message2 -> {
                return InternalEvent.builder(EventContextFactory.create(this.flowConstruct, getLocation())).message(message).build();
            }).doOnNext(internalEvent -> {
                PrivilegedEvent.setCurrentEvent(internalEvent);
            }).doOnNext(internalEvent2 -> {
                this.notificationHelper.fireNotification(this, internalEvent2, getLocation(), ConnectorMessageNotification.MESSAGE_RECEIVED);
            }).cast(CoreEvent.class).transform(this.listener).doOnError(MessagingException.class, messagingException -> {
                ((BaseEventContext) messagingException.getEvent().getContext()).error(messagingException);
            }).doOnSuccess(coreEvent -> {
                ((BaseEventContext) coreEvent.getContext()).success();
            }).doFinally(signalType -> {
                synchronized (this) {
                    this.executing = false;
                }
            }).subscribe(Operators.requestUnbounded());
        } catch (Exception e) {
            this.muleContext.getExceptionListener().handleException(e);
        }
    }

    @Override // org.mule.runtime.api.lifecycle.Initialisable
    public void initialise() throws InitialisationException {
        this.flowConstruct = ComponentUtils.getFromAnnotatedObjectOrFail(this.muleContext.getConfigurationComponentLocator(), this);
        createScheduler();
    }

    @Override // org.mule.runtime.api.lifecycle.Disposable
    public void dispose() {
        disposeScheduler();
    }

    private void createScheduler() throws InitialisationException {
        this.pollingExecutor = this.muleContext.getSchedulerService().cpuLightScheduler();
    }

    private void disposeScheduler() {
        if (this.pollingExecutor != null) {
            this.pollingExecutor.stop();
            this.pollingExecutor = null;
        }
    }

    @Override // org.mule.runtime.core.api.context.MuleContextAware
    public void setMuleContext(MuleContext muleContext) {
        this.muleContext = muleContext;
    }

    @Override // org.mule.runtime.core.api.source.MessageSource
    public void setListener(Processor processor) {
        this.listener = processor;
    }
}
