package org.mule.runtime.core.routing.correlation;

import java.io.Serializable;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.Event;
import org.mule.runtime.core.api.MuleContext;
import org.mule.runtime.core.api.construct.FlowConstruct;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.routing.RoutingException;
import org.mule.runtime.core.api.store.ObjectAlreadyExistsException;
import org.mule.runtime.core.api.store.ObjectDoesNotExistException;
import org.mule.runtime.core.api.store.ObjectStore;
import org.mule.runtime.core.api.store.ObjectStoreException;
import org.mule.runtime.core.api.store.PartitionableObjectStore;
import org.mule.runtime.core.config.i18n.CoreMessages;
import org.mule.runtime.core.context.notification.RoutingNotification;
import org.mule.runtime.core.exception.MessagingException;
import org.mule.runtime.core.execution.ErrorHandlingExecutionTemplate;
import org.mule.runtime.core.message.GroupCorrelation;
import org.mule.runtime.core.routing.EventGroup;
import org.mule.runtime.core.routing.EventProcessingThread;
import org.mule.runtime.core.routing.requestreply.AbstractAsyncRequestReplyRequester;
import org.mule.runtime.core.util.StringMessageUtils;
import org.mule.runtime.core.util.concurrent.ThreadNameHelper;
import org.mule.runtime.core.util.monitor.Expirable;
import org.mule.runtime.core.util.monitor.ExpiryMonitor;
import org.mule.runtime.core.util.store.DeserializationPostInitialisable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/mule/runtime/core/routing/correlation/EventCorrelator.class */
public class EventCorrelator implements Startable, Stoppable, Disposable {
    public static final String NO_CORRELATION_ID = "no-id";
    private static final long ONE_DAY_IN_MILLI = 86400000;
    protected ObjectStore<Long> processedGroups;
    private MuleContext muleContext;
    private EventCorrelatorCallback callback;
    private Processor timeoutMessageProcessor;
    private PartitionableObjectStore correlatorStore;
    private String storePrefix;
    private ExpiringGroupMonitoringThread expiringGroupMonitoringThread;
    private final String name;
    private final FlowConstruct flowConstruct;
    protected final transient Logger logger = LoggerFactory.getLogger(EventCorrelator.class);
    protected long groupTimeToLive = ONE_DAY_IN_MILLI;
    protected final Object groupsLock = new Object();
    private long timeout = -1;
    private boolean failOnTimeout = true;

    /* loaded from: input_file:org/mule/runtime/core/routing/correlation/EventCorrelator$ExpiringGroupMonitoringThread.class */
    private final class ExpiringGroupMonitoringThread extends EventProcessingThread implements Expirable, Disposable {
        private ExpiryMonitor expiryMonitor;
        public static final long DELAY_TIME = 10;

        public ExpiringGroupMonitoringThread() {
            super(EventCorrelator.this.name, 10L);
            this.expiryMonitor = new ExpiryMonitor(EventCorrelator.this.name, AbstractAsyncRequestReplyRequester.UNCLAIMED_TIME_TO_LIVE, EventCorrelator.this.muleContext, true);
            this.expiryMonitor.addExpirable(1800000L, TimeUnit.MILLISECONDS, this);
        }

        @Override // org.mule.runtime.core.util.monitor.Expirable
        public void expired() {
            try {
                for (Serializable serializable : EventCorrelator.this.correlatorStore.allKeys(EventCorrelator.this.getExpiredAndDispatchedPartitionKey())) {
                    if (((Long) EventCorrelator.this.correlatorStore.retrieve(serializable, EventCorrelator.this.getExpiredAndDispatchedPartitionKey())).longValue() + EventCorrelator.this.groupTimeToLive < System.currentTimeMillis()) {
                        EventCorrelator.this.correlatorStore.remove(serializable, EventCorrelator.this.getExpiredAndDispatchedPartitionKey());
                        this.logger.warn(MessageFormat.format("Discarding group {0}", serializable));
                    }
                }
            } catch (ObjectStoreException e) {
                this.logger.warn("Expiration of objects failed due to ObjectStoreException " + e + ".");
            }
        }

        @Override // org.mule.runtime.core.routing.EventProcessingThread
        public void doRun() {
            if (EventCorrelator.this.muleContext.isPrimaryPollingInstance()) {
                ArrayList<EventGroup> arrayList = new ArrayList(1);
                try {
                    Iterator<Serializable> it = EventCorrelator.this.correlatorStore.allKeys(EventCorrelator.this.getEventGroupsPartitionKey()).iterator();
                    while (it.hasNext()) {
                        EventGroup eventGroup = EventCorrelator.this.getEventGroup(it.next());
                        if (eventGroup != null && eventGroup.getCreated() + EventCorrelator.this.getTimeout() < System.currentTimeMillis()) {
                            arrayList.add(eventGroup);
                        }
                    }
                } catch (ObjectStoreException e) {
                    this.logger.warn("expiry failed dues to ObjectStoreException " + e);
                }
                for (EventGroup eventGroup2 : arrayList) {
                    try {
                        ErrorHandlingExecutionTemplate.createErrorHandlingExecutionTemplate(EventCorrelator.this.muleContext, EventCorrelator.this.flowConstruct, EventCorrelator.this.flowConstruct.getExceptionListener()).execute(() -> {
                            EventCorrelator.this.handleGroupExpiry(eventGroup2);
                            return null;
                        });
                    } catch (Exception e2) {
                        EventCorrelator.this.muleContext.getExceptionListener().handleException(e2);
                    } catch (MessagingException e3) {
                    }
                }
            }
        }

        public void dispose() {
            if (this.expiryMonitor != null) {
                this.expiryMonitor.dispose();
            }
        }
    }

    public EventCorrelator(EventCorrelatorCallback eventCorrelatorCallback, Processor processor, MuleContext muleContext, FlowConstruct flowConstruct, PartitionableObjectStore partitionableObjectStore, String str, ObjectStore<Long> objectStore) {
        this.processedGroups = null;
        this.correlatorStore = null;
        if (eventCorrelatorCallback == null) {
            throw new IllegalArgumentException(CoreMessages.objectIsNull("EventCorrelatorCallback").getMessage());
        }
        if (muleContext == null) {
            throw new IllegalArgumentException(CoreMessages.objectIsNull("MuleContext").getMessage());
        }
        this.callback = eventCorrelatorCallback;
        this.muleContext = muleContext;
        this.timeoutMessageProcessor = processor;
        this.name = String.format("%s%s.event.correlator", ThreadNameHelper.getPrefix(muleContext), flowConstruct.getName());
        this.flowConstruct = flowConstruct;
        this.correlatorStore = partitionableObjectStore;
        this.storePrefix = str;
        this.processedGroups = objectStore;
    }

    public void forceGroupExpiry(String str) throws MuleException {
        try {
            if (this.correlatorStore.retrieve(str, getEventGroupsPartitionKey()) != null) {
                handleGroupExpiry(getEventGroup(str));
            } else {
                addProcessedGroup(str);
            }
        } catch (ObjectStoreException e) {
            throw new MessagingException((Event) null, (Throwable) e);
        }
    }

    public Event process(Event event) throws RoutingException {
        String correlationId = event.getCorrelationId();
        if (this.logger.isTraceEnabled()) {
            try {
                this.logger.trace(String.format("Received async reply message for correlationID: %s%n%s%n%s", correlationId, StringMessageUtils.truncate(StringMessageUtils.toString(event.mo7getMessage().getPayload().getValue()), 200, false), event.mo7getMessage().toString()));
            } catch (Exception e) {
            }
        }
        try {
            if (isGroupAlreadyProcessed(correlationId)) {
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("An event was received for an event group that has already been processed, this is probably because the async-reply timed out. GroupCorrelation Id is: " + correlationId + ". Dropping event");
                }
                this.muleContext.fireNotification(new RoutingNotification(event.mo7getMessage(), event.getContext().getOriginatingConnectorName(), RoutingNotification.MISSED_AGGREGATION_GROUP_EVENT));
                return null;
            }
            try {
                EventGroup eventGroup = getEventGroup(correlationId);
                if (eventGroup == null) {
                    try {
                        EventGroup createEventGroup = this.callback.createEventGroup(event, correlationId);
                        createEventGroup.initEventsStore(this.correlatorStore);
                        eventGroup = addEventGroup(createEventGroup);
                    } catch (ObjectStoreException e2) {
                        throw new RoutingException(this.timeoutMessageProcessor, (Throwable) e2);
                    }
                }
                synchronized (this.groupsLock) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("Adding event to aggregator group: " + correlationId);
                    }
                    try {
                        eventGroup.addEvent(event);
                        if (!this.callback.shouldAggregateEvents(eventGroup)) {
                            return null;
                        }
                        Event aggregateEvents = this.callback.aggregateEvents(eventGroup);
                        try {
                            removeEventGroup(eventGroup);
                            eventGroup.clear();
                            return aggregateEvents;
                        } catch (ObjectStoreException e3) {
                            throw new RoutingException(this.timeoutMessageProcessor, (Throwable) e3);
                        }
                    } catch (ObjectStoreException e4) {
                        throw new RoutingException(this.timeoutMessageProcessor, (Throwable) e4);
                    }
                }
            } catch (ObjectStoreException e5) {
                throw new RoutingException(this.timeoutMessageProcessor, (Throwable) e5);
            }
        } catch (ObjectStoreException e6) {
            throw new RoutingException(this.timeoutMessageProcessor, (Throwable) e6);
        }
    }

    protected EventGroup getEventGroup(Serializable serializable) throws ObjectStoreException {
        try {
            EventGroup eventGroup = (EventGroup) this.correlatorStore.retrieve(serializable, getEventGroupsPartitionKey());
            if (!eventGroup.isInitialised()) {
                try {
                    DeserializationPostInitialisable.Implementation.init(eventGroup, this.muleContext);
                } catch (Exception e) {
                    throw new ObjectStoreException(e);
                }
            }
            eventGroup.initEventsStore(this.correlatorStore);
            return eventGroup;
        } catch (ObjectDoesNotExistException e2) {
            return null;
        }
    }

    protected EventGroup addEventGroup(EventGroup eventGroup) throws ObjectStoreException {
        try {
            this.correlatorStore.store((Serializable) eventGroup.getGroupId(), eventGroup, getEventGroupsPartitionKey());
            return eventGroup;
        } catch (ObjectAlreadyExistsException e) {
            return getEventGroup((String) eventGroup.getGroupId());
        }
    }

    protected void removeEventGroup(EventGroup eventGroup) throws ObjectStoreException {
        Object groupId = eventGroup.getGroupId();
        synchronized (this.groupsLock) {
            if (!isGroupAlreadyProcessed(groupId)) {
                this.correlatorStore.remove((Serializable) groupId, getEventGroupsPartitionKey());
                addProcessedGroup(groupId);
            }
        }
    }

    protected void addProcessedGroup(Object obj) throws ObjectStoreException {
        synchronized (this.groupsLock) {
            this.processedGroups.store((Serializable) obj, Long.valueOf(System.currentTimeMillis()));
        }
    }

    protected boolean isGroupAlreadyProcessed(Object obj) throws ObjectStoreException {
        boolean contains;
        synchronized (this.groupsLock) {
            contains = this.processedGroups.contains((Serializable) obj);
        }
        return contains;
    }

    public boolean isFailOnTimeout() {
        return this.failOnTimeout;
    }

    public void setFailOnTimeout(boolean z) {
        this.failOnTimeout = z;
    }

    public long getTimeout() {
        return this.timeout;
    }

    public void setTimeout(long j) {
        this.timeout = j;
    }

    protected void handleGroupExpiry(EventGroup eventGroup) throws MuleException {
        try {
            removeEventGroup(eventGroup);
            if (isFailOnTimeout()) {
                this.muleContext.fireNotification(new RoutingNotification(eventGroup.getMessageCollectionEvent().mo7getMessage(), null, RoutingNotification.CORRELATION_TIMEOUT));
                try {
                    eventGroup.clear();
                } catch (ObjectStoreException e) {
                    this.logger.warn("Failed to clear group with id " + eventGroup.getGroupId() + " since underlying ObjectStore threw Exception:" + e.getMessage());
                }
                throw new CorrelationTimeoutException(CoreMessages.correlationTimedOut(eventGroup.getGroupId()));
            }
            if (this.logger.isDebugEnabled()) {
                this.logger.debug(MessageFormat.format("Aggregator expired, but ''failOnTimeOut'' is false. Forwarding {0} events out of {1} total for group ID: {2}", Integer.valueOf(eventGroup.size()), eventGroup.expectedSize().map(num -> {
                    return num.toString();
                }).orElse(GroupCorrelation.NOT_SET), eventGroup.getGroupId()));
            }
            try {
                if (eventGroup.getCreated() + this.groupTimeToLive >= System.currentTimeMillis()) {
                    Event build = Event.builder(this.callback.aggregateEvents(eventGroup)).build();
                    eventGroup.clear();
                    if (this.correlatorStore.contains((Serializable) eventGroup.getGroupId(), getExpiredAndDispatchedPartitionKey())) {
                        this.logger.warn(MessageFormat.format("Discarding group {0}", eventGroup.getGroupId()));
                    } else {
                        if (this.timeoutMessageProcessor == null) {
                            throw new MessagingException(CoreMessages.createStaticMessage(MessageFormat.format("Group {0} timed out, but no timeout message processor was configured.", eventGroup.getGroupId())), build);
                        }
                        this.timeoutMessageProcessor.process(build);
                        this.correlatorStore.store((Serializable) eventGroup.getGroupId(), Long.valueOf(eventGroup.getCreated()), getExpiredAndDispatchedPartitionKey());
                    }
                }
            } catch (Exception e2) {
                throw new MessagingException(eventGroup.getMessageCollectionEvent(), e2);
            } catch (MessagingException e3) {
                throw e3;
            }
        } catch (ObjectStoreException e4) {
            throw new DefaultMuleException((Throwable) e4);
        }
    }

    public void start() throws MuleException {
        this.logger.info("Starting event correlator: " + this.name);
        if (this.timeout != 0) {
            this.expiringGroupMonitoringThread = new ExpiringGroupMonitoringThread();
            this.expiringGroupMonitoringThread.start();
        }
    }

    public void stop() throws MuleException {
        this.logger.info("Stopping event correlator: " + this.name);
        if (this.expiringGroupMonitoringThread != null) {
            this.expiringGroupMonitoringThread.stopProcessing();
        }
    }

    protected String getExpiredAndDispatchedPartitionKey() {
        return this.storePrefix + ".expiredAndDispatchedGroups";
    }

    protected String getEventGroupsPartitionKey() {
        return this.storePrefix + ".eventGroups";
    }

    public void dispose() {
        disposeIfDisposable(this.expiringGroupMonitoringThread);
    }

    private void disposeIfDisposable(Object obj) {
        if (obj == null || !(obj instanceof Disposable)) {
            return;
        }
        ((Disposable) obj).dispose();
    }
}
