package com.mulesoft.mule.runtime.module.batch.internal.engine;

import com.google.common.collect.ImmutableList;
import com.mulesoft.mule.runtime.module.batch.BatchProcessingListener;
import com.mulesoft.mule.runtime.module.batch.BatchProperties;
import com.mulesoft.mule.runtime.module.batch.api.BatchJob;
import com.mulesoft.mule.runtime.module.batch.api.BatchStep;
import com.mulesoft.mule.runtime.module.batch.api.extension.structure.BatchJobInstance;
import com.mulesoft.mule.runtime.module.batch.api.extension.structure.BatchJobInstanceStatus;
import com.mulesoft.mule.runtime.module.batch.api.extension.structure.BatchJobResult;
import com.mulesoft.mule.runtime.module.batch.api.notification.BatchNotification;
import com.mulesoft.mule.runtime.module.batch.api.record.Record;
import com.mulesoft.mule.runtime.module.batch.engine.BatchEngine;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobInstanceAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.BatchJobInstanceStore;
import com.mulesoft.mule.runtime.module.batch.engine.BatchRecordDispatcher;
import com.mulesoft.mule.runtime.module.batch.engine.BatchStepAdapter;
import com.mulesoft.mule.runtime.module.batch.engine.queue.BatchQueueManager;
import com.mulesoft.mule.runtime.module.batch.engine.transaction.BatchTransactionContext;
import com.mulesoft.mule.runtime.module.batch.engine.transaction.BatchTransactionContextFactory;
import com.mulesoft.mule.runtime.module.batch.exception.BatchException;
import com.mulesoft.mule.runtime.module.batch.internal.BaseBatchProcessingListenerOwner;
import com.mulesoft.mule.runtime.module.batch.internal.BatchJobResultAdapter;
import com.mulesoft.mule.runtime.module.batch.internal.DefaultBatchJobInstance;
import com.mulesoft.mule.runtime.module.batch.internal.ImmutableBatchJobResult;
import com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.RecordBuffer;
import com.mulesoft.mule.runtime.module.batch.internal.engine.buffer.SteppingQueueBuffer;
import com.mulesoft.mule.runtime.module.batch.internal.engine.history.DefaultHistoryExpirationMonitor;
import com.mulesoft.mule.runtime.module.batch.internal.engine.history.HistoryExpirationMonitor;
import com.mulesoft.mule.runtime.module.batch.internal.engine.queue.BatchQueueLoader;
import com.mulesoft.mule.runtime.module.batch.internal.engine.threading.BatchWorkManager;
import com.mulesoft.mule.runtime.module.batch.internal.engine.transaction.DefaultBatchTransactionContextFactory;
import com.mulesoft.mule.runtime.module.batch.internal.reporting.ExceptionsInTextBatchResultReporter;
import com.mulesoft.mule.runtime.module.batch.util.BatchUtils;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import javax.inject.Inject;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.mule.runtime.api.component.ConfigurationProperties;
import org.mule.runtime.api.component.location.ConfigurationComponentLocator;
import org.mule.runtime.api.exception.ExceptionHelper;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.message.Message;
import org.mule.runtime.api.metadata.TypedValue;
import org.mule.runtime.api.notification.NotificationDispatcher;
import org.mule.runtime.api.notification.NotificationListenerRegistry;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.streaming.HasSize;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.context.MuleContextAware;
import org.mule.runtime.core.api.context.notification.MuleContextNotification;
import org.mule.runtime.core.api.context.notification.MuleContextNotificationListener;
import org.mule.runtime.core.api.el.ExpressionManager;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.util.ExceptionUtils;
import org.mule.runtime.core.internal.component.ComponentUtils;
import org.mule.runtime.core.internal.profiling.InternalProfilingService;
import org.mule.runtime.core.internal.routing.split.EventToMessageSequenceSplittingStrategy;
import org.mule.runtime.core.internal.routing.split.ExpressionSplittingStrategy;
import org.mule.runtime.core.internal.routing.split.MessageSequence;
import org.mule.runtime.core.internal.routing.split.SplittingStrategy;
import org.mule.runtime.tracer.api.context.getter.MapDistributedTraceContextGetter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/mulesoft/mule/runtime/module/batch/internal/engine/DefaultBatchEngine.class */
public class DefaultBatchEngine extends BaseBatchProcessingListenerOwner implements BatchEngine, MuleContextAware, Initialisable, Stoppable, Startable {

    @Inject
    private ConfigurationProperties configurationProperties;
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultBatchEngine.class);
    private static final String MULE_BATCH_HISTORY_EXPIRATION_FREQUENCY = "mule.batch.historyExpirationFrequency";
    private static final int UNLIMITED = -1;
    private MuleContextNotificationListener<MuleContextNotification> contextStartListener;
    private SplittingStrategy<CoreEvent, MessageSequence<?>> splittingStrategy;

    @Inject
    private BatchWorkManager workManager;

    @Inject
    private BatchQueueManager batchQueueManager;

    @Inject
    private BatchJobInstanceStore jobInstanceStore;

    @Inject
    private BatchLockFactory lockFactory;

    @Inject
    private SchedulerService schedulerService;

    @Inject
    private ExpressionManager expressionManager;

    @Inject
    private NotificationDispatcher notificationDispatcher;

    @Inject
    private NotificationListenerRegistry notificationListenerRegistry;

    @Inject
    private ConfigurationComponentLocator componentLocator;

    @Inject
    private InternalProfilingService profilingService;
    private BatchTransactionContextFactory batchTransactionContextFactory;
    private BatchRecordDispatcher recordDispatcher;
    private HistoryExpirationMonitor historyExpirationMonitor;
    private MuleContext muleContext;
    private RecordBuffer queueBuffer;
    private final Map<String, BatchJobAdapter> jobs = new HashMap();
    private long historyExpirationFrequency = Long.parseLong(System.getProperty(MULE_BATCH_HISTORY_EXPIRATION_FREQUENCY, String.valueOf(TimeUnit.HOURS.toMillis(1))));
    private final AtomicBoolean contextStarted = new AtomicBoolean(false);
    private final AtomicBoolean engineInitialized = new AtomicBoolean(false);
    private final AtomicBoolean stopping = new AtomicBoolean(false);

    public void initialise() throws InitialisationException {
        this.splittingStrategy = new EventToMessageSequenceSplittingStrategy(new ExpressionSplittingStrategy(this.expressionManager));
        this.contextStartListener = new MuleContextNotificationListener<MuleContextNotification>() { // from class: com.mulesoft.mule.runtime.module.batch.internal.engine.DefaultBatchEngine.1
            public void onNotification(MuleContextNotification muleContextNotification) {
                if (muleContextNotification.getAction().getActionId() == 104) {
                    DefaultBatchEngine.this.notificationListenerRegistry.unregisterListener(this);
                    DefaultBatchEngine.this.contextStartListener = null;
                    synchronized (DefaultBatchEngine.this.jobs) {
                        DefaultBatchEngine.this.contextStarted.set(true);
                        if (!DefaultBatchEngine.this.engineInitialized.get() && DefaultBatchEngine.this.areJobsRegistered()) {
                            DefaultBatchEngine.this.initializeEngine();
                        }
                    }
                }
            }
        };
        this.notificationListenerRegistry.registerListener(this.contextStartListener);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void initializeEngine() {
        this.stopping.set(false);
        this.batchTransactionContextFactory = new DefaultBatchTransactionContextFactory(this);
        this.queueBuffer = new SteppingQueueBuffer(this);
        this.recordDispatcher = createRecordDispatcher();
        this.recordDispatcher.start();
        this.batchQueueManager.addBatchProcessingListener(this.recordDispatcher.getListener());
        this.jobInstanceStore.addBatchProcessingListener(this.recordDispatcher.getListener());
        createAndStartHistoryExpirationMonitor();
        this.engineInitialized.set(true);
    }

    public void stop() throws MuleException {
        this.stopping.set(true);
        if (this.recordDispatcher != null) {
            this.recordDispatcher.stop();
        }
        if (this.batchTransactionContextFactory != null) {
            this.batchTransactionContextFactory.closeAndRollback();
        }
        if (this.historyExpirationMonitor != null) {
            this.historyExpirationMonitor.stopMonitoring();
        }
        if (this.batchQueueManager != null) {
            this.batchQueueManager.removeBatchProcessingListener(this.recordDispatcher.getListener());
        }
        if (this.jobInstanceStore != null) {
            this.jobInstanceStore.removeBatchProcessingListener(this.recordDispatcher.getListener());
        }
        this.contextStarted.set(false);
        this.engineInitialized.set(false);
    }

    private BatchJobInstanceAdapter flushSteppingQueueBuffer(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        this.queueBuffer.flush(batchJobInstanceAdapter);
        return refresh(batchJobInstanceAdapter);
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public BatchJobInstanceAdapter createNewJobInstance(BatchJobAdapter batchJobAdapter, CoreEvent coreEvent) throws MuleException {
        String generateJobInstanceId = batchJobAdapter.generateJobInstanceId(coreEvent);
        DefaultBatchJobInstance defaultBatchJobInstance = new DefaultBatchJobInstance(generateJobInstanceId, batchJobAdapter.getName(), coreEvent);
        defaultBatchJobInstance.setStatus(BatchJobInstanceStatus.LOADING);
        defaultBatchJobInstance.setQueueName(this.batchQueueManager.steppingQueue(defaultBatchJobInstance).getQueueName());
        try {
            this.jobInstanceStore.store(defaultBatchJobInstance);
            LOGGER.info("Created instance '{}' for batch job '{}'", defaultBatchJobInstance.getId(), batchJobAdapter.getName());
            return defaultBatchJobInstance;
        } catch (IllegalStateException e) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Batch Job '%s' already has an instance with id '%s'. Please upgrade the %s expression to produce unique values", batchJobAdapter.getName(), generateJobInstanceId, BatchProperties.BATCH_JOB_INSTANCE_ID_ATTRIBUTE_NAME)), e);
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public BatchTransactionContext createTransactionContext(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        return this.batchTransactionContextFactory.createTransactionContext(batchJobInstanceAdapter);
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public BatchJobInstanceAdapter load(BatchJobInstanceAdapter batchJobInstanceAdapter, CoreEvent coreEvent) throws MuleException {
        BatchJobAdapter jobFor = getJobFor(batchJobInstanceAdapter);
        batchJobInstanceAdapter.setRecordCount(getRecordCountIfPossible(coreEvent));
        this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, BatchNotification.LOAD_PHASE_BEGIN));
        try {
            batchJobInstanceAdapter.setRecordCount(new BatchQueueLoader(this, jobFor.getBlockSize(), this.splittingStrategy, this.notificationDispatcher, this.profilingService.getCoreEventTracer()).splitAndLoad(jobFor, batchJobInstanceAdapter, coreEvent));
            try {
                startExecution(batchJobInstanceAdapter);
                this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, BatchNotification.LOAD_PHASE_END));
                return batchJobInstanceAdapter;
            } catch (Exception e) {
                try {
                    releaseResources(batchJobInstanceAdapter, true);
                } catch (Exception e2) {
                    LOGGER.error(String.format("Exception found while trying to release resources of instance %s of job %s. Original exception will be re-thrown", batchJobInstanceAdapter.getId(), jobFor.getName()), e2);
                }
                this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, BatchNotification.LOAD_PHASE_FAILED));
                throw new BatchException(e, batchJobInstanceAdapter);
            }
        } catch (Exception e3) {
            BatchJobResult result = batchJobInstanceAdapter.getResult();
            if (result instanceof BatchJobResultAdapter) {
                ((BatchJobResultAdapter) result).setLoadingPhaseException(e3);
            }
            batchJobInstanceAdapter.setStatus(BatchJobInstanceStatus.FAILED_LOADING);
            this.jobInstanceStore.update(batchJobInstanceAdapter);
            this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, e3, BatchNotification.LOAD_PHASE_FAILED));
            throw new BatchException(e3, batchJobInstanceAdapter);
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public void startExecution(BatchJobInstanceAdapter batchJobInstanceAdapter) throws MuleException {
        batchJobInstanceAdapter.setStatus(BatchJobInstanceStatus.EXECUTING);
        if (batchJobInstanceAdapter.getRecordCount() <= 0) {
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("Instance '%s' of job '%s' has no records to process. It's execution will be finished now", batchJobInstanceAdapter.getId(), batchJobInstanceAdapter.getOwnerJobName()));
            }
            this.jobInstanceStore.update(batchJobInstanceAdapter);
            finishExecution(batchJobInstanceAdapter, true);
            return;
        }
        BatchJobResult result = batchJobInstanceAdapter.getResult();
        if (result instanceof BatchJobResultAdapter) {
            ((BatchJobResultAdapter) result).startClock();
        }
        this.workManager.executable(batchJobInstanceAdapter);
        this.jobInstanceStore.update(batchJobInstanceAdapter);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(String.format("Started execution of instance '%s' of job '%s'", batchJobInstanceAdapter.getId(), batchJobInstanceAdapter.getOwnerJobName()));
        }
        this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, BatchNotification.JOB_PROCESS_RECORDS_BEGIN));
    }

    private BatchJobInstanceAdapter processJobInstanceStatus(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        if (batchJobInstanceAdapter.getResult().getFailedRecords() > 0) {
            batchJobInstanceAdapter.setStatus(BatchJobInstanceStatus.FAILED_PROCESS_RECORDS);
        } else {
            batchJobInstanceAdapter.setStatus(BatchJobInstanceStatus.SUCCESSFUL);
        }
        stopClock(batchJobInstanceAdapter);
        this.jobInstanceStore.update(batchJobInstanceAdapter);
        return batchJobInstanceAdapter;
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public BatchJobInstanceAdapter finishExecution(BatchJobInstanceAdapter batchJobInstanceAdapter, boolean z) throws MuleException {
        try {
            Pair<BatchJobInstanceAdapter, Boolean> processJobStatusBeforeOnComplete = processJobStatusBeforeOnComplete(batchJobInstanceAdapter, z);
            batchJobInstanceAdapter = (BatchJobInstanceAdapter) processJobStatusBeforeOnComplete.getLeft();
            if (((Boolean) processJobStatusBeforeOnComplete.getRight()).booleanValue()) {
                return batchJobInstanceAdapter;
            }
            this.workManager.scheduleManagementWork(() -> {
                try {
                    stopAndAwait(batchJobInstanceAdapter);
                    processOnComplete(batchJobInstanceAdapter);
                } catch (MuleException e) {
                    LOGGER.error(String.format("Error invoking on-complete phase for instance '%s' or job '%s'", batchJobInstanceAdapter.getId(), batchJobInstanceAdapter.getOwnerJobName()), e);
                } finally {
                    finallyOnFinishJobInstance(batchJobInstanceAdapter, true);
                }
            });
            return batchJobInstanceAdapter;
        } catch (RuntimeException e) {
            finallyOnFinishJobInstance(batchJobInstanceAdapter, true);
            throw e;
        }
    }

    private void stopAndAwait(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        this.workManager.awaitStop(batchJobInstanceAdapter);
    }

    private Pair<BatchJobInstanceAdapter, Boolean> processJobStatusBeforeOnComplete(BatchJobInstanceAdapter batchJobInstanceAdapter, boolean z) {
        Lock lock = getLock(batchJobInstanceAdapter);
        lock.lock();
        if (z) {
            try {
                batchJobInstanceAdapter = refresh(batchJobInstanceAdapter);
            } catch (Throwable th) {
                lock.unlock();
                throw th;
            }
        }
        if (batchJobInstanceAdapter.getStatus() != BatchJobInstanceStatus.EXECUTING) {
            ImmutablePair immutablePair = new ImmutablePair(batchJobInstanceAdapter, true);
            lock.unlock();
            return immutablePair;
        }
        ImmutablePair immutablePair2 = new ImmutablePair(processJobInstanceStatus(batchJobInstanceAdapter), false);
        lock.unlock();
        return immutablePair2;
    }

    private void finallyOnFinishJobInstance(BatchJobInstanceAdapter batchJobInstanceAdapter, boolean z) {
        try {
            try {
                this.jobInstanceStore.update(batchJobInstanceAdapter);
                releaseResources(batchJobInstanceAdapter, false);
                int notificationAction = getNotificationAction(batchJobInstanceAdapter);
                logJobInstanceResult(batchJobInstanceAdapter);
                if (notificationAction <= 0 || !z) {
                    return;
                }
                this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, notificationAction));
            } catch (IllegalStateException e) {
                if (!this.stopping.get()) {
                    throw e;
                }
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Error on releasing resources probably because batch engine is stopping", e);
                }
                int notificationAction2 = getNotificationAction(batchJobInstanceAdapter);
                logJobInstanceResult(batchJobInstanceAdapter);
                if (notificationAction2 <= 0 || !z) {
                    return;
                }
                this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, notificationAction2));
            }
        } catch (Throwable th) {
            int notificationAction3 = getNotificationAction(batchJobInstanceAdapter);
            logJobInstanceResult(batchJobInstanceAdapter);
            if (notificationAction3 > 0 && z) {
                this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, notificationAction3));
            }
            throw th;
        }
    }

    private int getNotificationAction(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        if (batchJobInstanceAdapter.getResult().getFailedRecords() > 0) {
            return BatchNotification.JOB_PROCESS_RECORDS_FAILED;
        }
        if (batchJobInstanceAdapter.getStatus().isFailure()) {
            return 0;
        }
        return BatchNotification.JOB_SUCCESSFUL;
    }

    private void processOnComplete(BatchJobInstanceAdapter batchJobInstanceAdapter) throws MuleException {
        BatchJobResult result = batchJobInstanceAdapter.getResult();
        doOnComplete(batchJobInstanceAdapter);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(String.format("Finishing execution for instance '%s' of job '%s'.", batchJobInstanceAdapter.getId(), batchJobInstanceAdapter.getOwnerJobName()));
        }
        Iterator<BatchProcessingListener> it = getListeners(batchJobInstanceAdapter).iterator();
        while (it.hasNext()) {
            it.next().onJobFinished(batchJobInstanceAdapter);
        }
        logExceptionsSummary(getJobFor(batchJobInstanceAdapter), result);
    }

    private void logJobInstanceResult(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        BatchJobResult result = batchJobInstanceAdapter.getResult();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(String.format("Finished execution for instance '%s' of job '%s'. Total Records processed: %d. Successful records: %d. Failed Records: %d", batchJobInstanceAdapter.getId(), batchJobInstanceAdapter.getOwnerJobName(), Long.valueOf(result.getProcessedRecords()), Long.valueOf(result.getSuccessfulRecords()), Long.valueOf(result.getFailedRecords())));
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public void releaseResources(BatchJobInstanceAdapter batchJobInstanceAdapter, boolean z) {
        if (z) {
            this.jobInstanceStore.clearFailedRecords(batchJobInstanceAdapter);
        }
        this.batchQueueManager.disposeQueues(createTransactionContext(batchJobInstanceAdapter));
        this.queueBuffer.forget(batchJobInstanceAdapter);
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public CoreEvent.Builder createEventBuilder(Record record, BatchJobInstanceAdapter batchJobInstanceAdapter) {
        return createEventBuilder(record, batchJobInstanceAdapter, null);
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public CoreEvent.Builder createEventBuilder(Record record, BatchJobInstanceAdapter batchJobInstanceAdapter, CompletableFuture<Void> completableFuture) {
        CoreEvent.Builder asEventBuilder = batchJobInstanceAdapter.getBatchEvent().asEventBuilder(getJobFor(batchJobInstanceAdapter), completableFuture);
        Message.Builder builder = null;
        if (record != null) {
            TypedValue<Object> payload = record.getPayload();
            TypedValue<Object> attributes = record.getAttributes();
            if (payload != null) {
                builder = Message.builder().value(payload.getValue()).mediaType(payload.getDataType().getMediaType());
                if (attributes != null) {
                    builder.attributesValue(attributes.getValue()).attributesMediaType(attributes.getDataType().getMediaType());
                }
            }
        }
        asEventBuilder.securityContext(batchJobInstanceAdapter.getBatchEvent().getSecurityContext().orElse(null));
        asEventBuilder.message(builder != null ? builder.build() : Message.of((Object) null)).addVariable(BatchProperties.BATCH_JOB_INSTANCE_ID_VARIABLE, batchJobInstanceAdapter.getId());
        if (record != null) {
            asEventBuilder.addVariable(BatchProperties.RECORD_VARIABLE_NAME, record);
            Map<String, TypedValue<?>> allVariables = record.getAllVariables();
            Objects.requireNonNull(asEventBuilder);
            allVariables.forEach(asEventBuilder::addVariable);
        }
        return asEventBuilder;
    }

    private void routeError(BatchTransactionContext batchTransactionContext, BatchStepAdapter batchStepAdapter, Record record) throws MuleException {
        BatchJobAdapter job = batchTransactionContext.getJob();
        BatchJobInstanceAdapter jobInstance = batchTransactionContext.getJobInstance();
        if (job.getMaxFailedRecords() == UNLIMITED || jobInstance.getResult().getFailedRecords() <= job.getMaxFailedRecords()) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Failed record number %d is still acceptable for instance %s of job %s. Routing to next step", Long.valueOf(jobInstance.getResult().getFailedRecords()), jobInstance.getId(), jobInstance.getOwnerJobName()));
            }
            routeNext(batchTransactionContext, batchStepAdapter, record);
        } else {
            LOGGER.info(String.format("instance '%s' of job '%s' has reached the max allowed number of failed records. Record will be added to failed list and the instance will be removed from execution pool", jobInstance.getId(), jobInstance.getOwnerJobName()));
            BatchUtils.singleAckAndCommitIfNecessary(batchTransactionContext);
            stopExecution(jobInstance, true);
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public void stopExecution(BatchJobInstanceAdapter batchJobInstanceAdapter, boolean z) throws MuleException {
        int i;
        Lock lock = getLock(batchJobInstanceAdapter);
        lock.lock();
        try {
            BatchJobInstanceAdapter refresh = refresh(batchJobInstanceAdapter);
            if (refresh.getStatus() != BatchJobInstanceStatus.EXECUTING) {
                return;
            }
            stopClock(refresh);
            if (z) {
                refresh.setStatus(BatchJobInstanceStatus.FAILED_PROCESS_RECORDS);
                i = BatchNotification.JOB_PROCESS_RECORDS_FAILED;
            } else {
                refresh.setStatus(BatchJobInstanceStatus.STOPPED);
                i = BatchNotification.JOB_STOPPED;
            }
            this.jobInstanceStore.update(refresh);
            Iterator<BatchProcessingListener> it = getListeners(refresh).iterator();
            while (it.hasNext()) {
                it.next().onJobStopped(refresh);
            }
            int i2 = i;
            this.workManager.scheduleManagementWork(() -> {
                stopAndAwait(refresh);
                logExceptionsSummary(getJobFor(refresh), refresh.getResult());
                this.notificationDispatcher.dispatch(new BatchNotification(refresh, i2));
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info(String.format("instance %s of job %s has been stopped. Instance status is %s", refresh.getId(), refresh.getOwnerJobName(), refresh.getStatus()));
                }
                try {
                    doOnComplete(refresh);
                } catch (MuleException e) {
                    LOGGER.error(String.format("Error invoking on-complete phase for instance '%s' or job '%s'", refresh.getId(), refresh.getOwnerJobName()), e);
                }
            });
            lock.unlock();
        } finally {
            lock.unlock();
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public void resumeExecution(BatchJobInstanceAdapter batchJobInstanceAdapter) throws MuleException {
        Lock lock = getLock(batchJobInstanceAdapter);
        lock.lock();
        try {
            BatchJobInstanceAdapter refresh = refresh(batchJobInstanceAdapter);
            if (refresh.getStatus() != BatchJobInstanceStatus.STOPPED) {
                throw new IllegalStateException(String.format("Instance '%s' of job '%s' has been requested to resume but it's on '%s' state. Only instances in state '%s' can be resumed", refresh.getId(), refresh.getOwnerJobName(), refresh.getStatus(), BatchJobInstanceStatus.STOPPED));
            }
            refresh.setStatus(BatchJobInstanceStatus.EXECUTING);
            startClock(refresh);
            this.workManager.executable(refresh);
            this.jobInstanceStore.update(refresh);
            Iterator<BatchProcessingListener> it = getListeners(refresh).iterator();
            while (it.hasNext()) {
                it.next().onExecutableStateTransition(refresh);
            }
            this.notificationDispatcher.dispatch(new BatchNotification(refresh, BatchNotification.JOB_PROCESS_RECORDS_BEGIN));
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info(String.format("instance '%s' of job '%s' has been resumed", refresh.getId(), refresh.getOwnerJobName()));
            }
        } finally {
            lock.unlock();
        }
    }

    private void stopClock(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        BatchJobResult result = batchJobInstanceAdapter.getResult();
        if (result instanceof BatchJobResultAdapter) {
            ((BatchJobResultAdapter) result).stopClock();
        }
    }

    private void startClock(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        BatchJobResult result = batchJobInstanceAdapter.getResult();
        if (result instanceof BatchJobResultAdapter) {
            ((BatchJobResultAdapter) result).startClock();
        }
    }

    private void logExceptionsSummary(BatchJob batchJob, BatchJobResult batchJobResult) {
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("\n" + new ExceptionsInTextBatchResultReporter().buildReport(batchJob, batchJobResult));
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public void cancel(BatchJobInstanceAdapter batchJobInstanceAdapter) throws MuleException {
        stopExecution(batchJobInstanceAdapter, false);
        batchJobInstanceAdapter.setStatus(BatchJobInstanceStatus.CANCELLED);
        releaseResources(batchJobInstanceAdapter, true);
        this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, BatchNotification.JOB_CANCELLED));
        this.jobInstanceStore.update(batchJobInstanceAdapter);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(String.format("instance %s of job %s has been cancelled.", batchJobInstanceAdapter.getId(), batchJobInstanceAdapter.getOwnerJobName()));
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public void cancelAllRunningInstance() throws MuleException {
        Iterator it = this.jobInstanceStore.getExecutingInstances().iterator();
        while (it.hasNext()) {
            cancel((BatchJobInstanceAdapter) it.next());
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public long getSteppingQueueSize(BatchTransactionContext batchTransactionContext) {
        BatchJobInstanceAdapter jobInstance = batchTransactionContext.getJobInstance();
        Lock lock = getLock(jobInstance);
        lock.lock();
        try {
            try {
                jobInstance = refresh(jobInstance);
                if (jobInstance == null || jobInstance.getStatus() != BatchJobInstanceStatus.EXECUTING) {
                    lock.unlock();
                    return 0L;
                }
                long size = getBatchQueueManager().steppingQueue(jobInstance).size(batchTransactionContext);
                lock.unlock();
                return size;
            } catch (MuleException e) {
                LOGGER.error(String.format("Could not get stepping queue count for instance '%s' of batch job '%s'", jobInstance.getId(), jobInstance.getOwnerJobName()), e);
                lock.unlock();
                return 0L;
            }
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public List<Record> getBlockFrom(BatchTransactionContext batchTransactionContext) {
        BatchJobInstanceAdapter jobInstance = batchTransactionContext.getJobInstance();
        Lock lock = getLock(jobInstance);
        lock.lock();
        try {
            try {
                jobInstance = refresh(jobInstance);
                if (jobInstance == null || jobInstance.getStatus() != BatchJobInstanceStatus.EXECUTING) {
                    lock.unlock();
                    return null;
                }
                List<Record> poll = getBatchQueueManager().steppingQueue(jobInstance).poll(batchTransactionContext);
                lock.unlock();
                return poll;
            } catch (MuleException e) {
                LOGGER.error(String.format("Could not get records for job instance id '%s' of batch job '%s'", jobInstance.getId(), jobInstance.getOwnerJobName()), e);
                lock.unlock();
                return null;
            }
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    private void routeNext(BatchTransactionContext batchTransactionContext, BatchStepAdapter batchStepAdapter, Record record) throws MuleException {
        BatchStep nextStep = batchStepAdapter.getNextStep();
        if (nextStep == null) {
            record.setCurrentStepId(null);
            BatchUtils.singleAckAndCommitIfNecessary(batchTransactionContext);
        } else {
            record.setCurrentStepId(nextStep.getName());
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("Queueing record back for step %s in instance %s of job %s", nextStep.getName(), batchTransactionContext.getJobInstance().getId(), batchTransactionContext.getJobInstance().getOwnerJobName()));
            }
            this.queueBuffer.add(batchTransactionContext.getJobInstance(), batchTransactionContext, record);
        }
    }

    private BatchJobInstanceAdapter updateStatistics(BatchTransactionContext batchTransactionContext, List<Record> list) {
        BatchJobAdapter job = batchTransactionContext.getJob();
        BatchJobInstanceAdapter jobInstance = batchTransactionContext.getJobInstance();
        Lock lock = getLock(jobInstance);
        lock.lock();
        try {
            BatchJobInstanceAdapter refresh = refresh(jobInstance);
            BatchJobResult result = refresh.getResult();
            if (result instanceof BatchJobResultAdapter) {
                ((BatchJobResultAdapter) result).updateFor(job, list);
            }
            this.jobInstanceStore.update(refresh);
            lock.unlock();
            batchTransactionContext.updateJobInstance(refresh);
            this.notificationDispatcher.dispatch(new BatchNotification(refresh, BatchNotification.PROGRESS_UPDATE));
            return refresh;
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    private BatchJobInstanceAdapter refresh(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        return this.jobInstanceStore.getJobInstance(batchJobInstanceAdapter.getOwnerJobName(), batchJobInstanceAdapter.getId());
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public BatchJobInstanceAdapter updateStatisticsAndRoute(BatchTransactionContext batchTransactionContext, List<Record> list) throws MuleException {
        Lock lock = getLock(batchTransactionContext.getJobInstance());
        try {
            lock.lock();
            BatchJobInstanceAdapter updateStatistics = updateStatistics(batchTransactionContext, list);
            BatchJobAdapter job = batchTransactionContext.getJob();
            ArrayList arrayList = new ArrayList(job.getBlockSize());
            for (Record record : list) {
                BatchStepAdapter stepById = job.getStepById(record.getCurrentStepId());
                if (record.isFailedFor(stepById)) {
                    arrayList.add(record);
                    routeError(batchTransactionContext, stepById, record);
                } else {
                    routeNext(batchTransactionContext, stepById, record);
                }
            }
            if (updateStatistics.getStatus().isFailure()) {
                this.jobInstanceStore.storeFailedRecords(updateStatistics, arrayList);
                BatchUtils.commit(batchTransactionContext);
            }
            return updateStatistics.getResult().getProcessedRecords() >= updateStatistics.getRecordCount() ? finishExecution(updateStatistics, true) : flushSteppingQueueBuffer(updateStatistics);
        } finally {
            lock.unlock();
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public Lock getLock(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        return this.lockFactory.createLock(String.format("BATCH-JOB-%s-INSTANCE-%s", batchJobInstanceAdapter.getOwnerJobName(), batchJobInstanceAdapter.getId()));
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public BatchJobAdapter getJobFor(BatchJobInstance batchJobInstance) {
        BatchJobAdapter batchJobAdapter = this.jobs.get(batchJobInstance.getOwnerJobName());
        if (batchJobAdapter == null) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Consistency error: job instance with id '%s' has owner job '%s' but such batch job couldn't be located", batchJobInstance.getId(), batchJobInstance.getOwnerJobName())));
        }
        return batchJobAdapter;
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public void registerBatchJob(BatchJobAdapter batchJobAdapter) {
        synchronized (this.jobs) {
            if (!this.contextStarted.get()) {
                FlowConstruct fromAnnotatedObjectOrFail = ComponentUtils.getFromAnnotatedObjectOrFail(this.componentLocator, batchJobAdapter);
                BatchJobAdapter putIfAbsent = this.jobs.putIfAbsent(batchJobAdapter.getName(), batchJobAdapter);
                if (putIfAbsent != null) {
                    throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Flow '%s' contains a batch job with name '%s', but Flow '%s' has already defined such job", fromAnnotatedObjectOrFail.getName(), batchJobAdapter.getName(), ComponentUtils.getFromAnnotatedObjectOrFail(this.componentLocator, putIfAbsent).getName())));
                }
                return;
            }
            boolean isEmpty = this.jobs.isEmpty();
            if (isEmpty) {
                initializeEngine();
            }
            this.jobs.put(batchJobAdapter.getName(), batchJobAdapter);
            if (isEmpty) {
                try {
                    this.workManager.start();
                } catch (MuleException e) {
                    throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage("Unable to start work manager when registering batch job with name '%s'", new Object[]{batchJobAdapter.getName()}), e);
                }
            }
        }
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public Collection<BatchJob> getBatchJobs() {
        return ImmutableList.copyOf(this.jobs.values());
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public Optional<BatchJob> getJob(String str) {
        return Optional.ofNullable(this.jobs.get(str));
    }

    private void doOnComplete(BatchJobInstanceAdapter batchJobInstanceAdapter) throws MuleException {
        this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, BatchNotification.ON_COMPLETE_BEGIN));
        BatchJobAdapter jobFor = getJobFor(batchJobInstanceAdapter);
        Processor orElse = jobFor.getOnCompleteBlock().orElse(null);
        if (orElse == null) {
            fireOnCompleteEndNotification(batchJobInstanceAdapter);
            return;
        }
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info(String.format("Starting execution of onComplete phase for instance %s of job %s", batchJobInstanceAdapter.getId(), batchJobInstanceAdapter.getOwnerJobName()));
        }
        CoreEvent build = createEventBuilder(null, batchJobInstanceAdapter).message(Message.builder().value(new ImmutableBatchJobResult(batchJobInstanceAdapter.getResult())).build()).build();
        this.profilingService.getCoreEventTracer().injectDistributedTraceContext(build.getContext(), new MapDistributedTraceContextGetter(batchJobInstanceAdapter.getBatchEvent().getSerializedBatchJobInstanceSpan()));
        new BatchProcessingTemplate(orElse, jobFor.getLocation(), this.muleContext.getFlowTraceManager(), this.muleContext.getStreamCloserService()) { // from class: com.mulesoft.mule.runtime.module.batch.internal.engine.DefaultBatchEngine.2
            @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.BatchProcessingTemplate
            protected void onSuccess(BatchJobInstanceAdapter batchJobInstanceAdapter2, CoreEvent coreEvent) throws MuleException {
                if (DefaultBatchEngine.LOGGER.isInfoEnabled()) {
                    DefaultBatchEngine.LOGGER.info(String.format("Finished execution of onComplete phase for instance %s of job %s", batchJobInstanceAdapter2.getId(), batchJobInstanceAdapter2.getOwnerJobName()));
                }
                DefaultBatchEngine.this.fireOnCompleteEndNotification(batchJobInstanceAdapter2);
            }

            @Override // com.mulesoft.mule.runtime.module.batch.internal.engine.BatchProcessingTemplate
            protected void onException(BatchJobInstanceAdapter batchJobInstanceAdapter2, Exception exc, CoreEvent coreEvent) throws MuleException {
                DefaultBatchEngine.this.onCompleteException(batchJobInstanceAdapter2, BatchUtils.toBatchException(exc, batchJobInstanceAdapter2));
            }
        }.process(batchJobInstanceAdapter, build, this.profilingService.getCoreEventTracer());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void fireOnCompleteEndNotification(BatchJobInstanceAdapter batchJobInstanceAdapter) {
        this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, BatchNotification.ON_COMPLETE_END));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onCompleteException(BatchJobInstanceAdapter batchJobInstanceAdapter, Exception exc) throws MuleException {
        Lock lock = getLock(batchJobInstanceAdapter);
        lock.lock();
        try {
            LOGGER.error(String.format("Exception was found during on-complete step for instance %s of job %s", batchJobInstanceAdapter.getId(), batchJobInstanceAdapter.getOwnerJobName()));
            LOGGER.error(buildExceptionLogMessage(exc));
            BatchJobResult result = batchJobInstanceAdapter.getResult();
            if (result instanceof BatchJobResultAdapter) {
                ((BatchJobResultAdapter) result).setOnCompletePhaseException(exc);
            }
            if (!batchJobInstanceAdapter.getStatus().isFailure()) {
                batchJobInstanceAdapter.setStatus(BatchJobInstanceStatus.FAILED_ON_COMPLETE);
            }
            this.notificationDispatcher.dispatch(new BatchNotification(batchJobInstanceAdapter, exc, BatchNotification.ON_COMPLETE_FAILED));
            lock.unlock();
        } catch (Throwable th) {
            lock.unlock();
            throw th;
        }
    }

    private long getRecordCountIfPossible(CoreEvent coreEvent) {
        Object value = coreEvent.getMessage().getPayload().getValue();
        try {
            if (value instanceof HasSize) {
                return ((HasSize) value).getSize();
            }
            if (value instanceof Collection) {
                return ((Collection) value).size();
            }
            return -1L;
        } catch (Throwable th) {
            LOGGER.warn("Exception found while trying to get the record count in advanced. Processing will continue and record count will be determined at the end of the loading phase", th);
            return -1L;
        }
    }

    private BatchRecordDispatcher createRecordDispatcher() {
        return new DefaultBatchRecordDispatcher(this, this.workManager, this.schedulerService, this.muleContext.getSchedulerBaseConfig(), this.notificationListenerRegistry);
    }

    private void createAndStartHistoryExpirationMonitor() {
        this.historyExpirationMonitor = new DefaultHistoryExpirationMonitor(this, this.jobInstanceStore, this.historyExpirationFrequency, TimeUnit.MILLISECONDS, this.muleContext);
        this.historyExpirationMonitor.beginMonitoring();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean areJobsRegistered() {
        return (getBatchJobs() == null || getBatchJobs().isEmpty()) ? false : true;
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public BatchJobInstanceStore getJobInstanceStore() {
        return this.jobInstanceStore;
    }

    public void setJobInstanceStore(BatchJobInstanceStore batchJobInstanceStore) {
        this.jobInstanceStore = batchJobInstanceStore;
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public BatchQueueManager getBatchQueueManager() {
        return this.batchQueueManager;
    }

    public void setBatchQueueManager(BatchQueueManager batchQueueManager) {
        this.batchQueueManager = batchQueueManager;
    }

    public void setMuleContext(MuleContext muleContext) {
        this.muleContext = muleContext;
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public int getBlockSize(BatchJobInstance batchJobInstance) {
        return getJobFor(batchJobInstance).getBlockSize();
    }

    @Override // com.mulesoft.mule.runtime.module.batch.engine.BatchEngine
    public BatchTransactionContextFactory getBatchTransactionContextFactory() {
        return this.batchTransactionContextFactory;
    }

    public void setHistoryExpirationFrequency(long j) {
        this.historyExpirationFrequency = j;
    }

    public static String buildExceptionLogMessage(Throwable th) {
        MuleException rootMuleException = ExceptionHelper.getRootMuleException(th);
        return rootMuleException != null ? rootMuleException.getDetailedMessage() : ExceptionUtils.getFullStackTraceWithoutMessages(th);
    }

    public void start() throws MuleException {
        if (this.engineInitialized.get() || !areJobsRegistered()) {
            return;
        }
        initializeEngine();
    }
}
