package org.mule.runtime.core.internal.routing.requestreply;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections.buffer.BoundedFifoBuffer;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.lifecycle.Disposable;
import org.mule.runtime.api.lifecycle.Initialisable;
import org.mule.runtime.api.lifecycle.InitialisationException;
import org.mule.runtime.api.lifecycle.Startable;
import org.mule.runtime.api.lifecycle.Stoppable;
import org.mule.runtime.api.meta.AbstractAnnotatedObject;
import org.mule.runtime.api.scheduler.Scheduler;
import org.mule.runtime.api.store.ObjectStore;
import org.mule.runtime.api.store.ObjectStoreException;
import org.mule.runtime.api.store.ObjectStoreManager;
import org.mule.runtime.api.store.ObjectStoreSettings;
import org.mule.runtime.core.api.DefaultMuleException;
import org.mule.runtime.core.api.InternalEvent;
import org.mule.runtime.core.api.config.MuleProperties;
import org.mule.runtime.core.api.config.i18n.CoreMessages;
import org.mule.runtime.core.api.context.notification.NotificationDispatcher;
import org.mule.runtime.core.api.context.notification.RoutingNotification;
import org.mule.runtime.core.api.exception.MessagingException;
import org.mule.runtime.core.api.processor.Processor;
import org.mule.runtime.core.api.processor.ReactiveProcessor;
import org.mule.runtime.core.api.processor.RequestReplyRequesterMessageProcessor;
import org.mule.runtime.core.api.registry.RegistrationException;
import org.mule.runtime.core.api.routing.ResponseTimeoutException;
import org.mule.runtime.core.api.source.MessageSource;
import org.mule.runtime.core.api.store.DeserializationPostInitialisable;
import org.mule.runtime.core.api.util.ObjectUtils;
import org.mule.runtime.core.api.util.concurrent.Latch;
import org.mule.runtime.core.internal.message.InternalMessage;
import org.mule.runtime.core.privileged.processor.AbstractInterceptingMessageProcessorBase;

/* loaded from: input_file:org/mule/runtime/core/internal/routing/requestreply/AbstractAsyncRequestReplyRequester.class */
public abstract class AbstractAsyncRequestReplyRequester extends AbstractInterceptingMessageProcessorBase implements RequestReplyRequesterMessageProcessor, Initialisable, Startable, Stoppable, Disposable {
    private static final int MAX_PROCESSED_GROUPS = 50000;
    private static final long UNCLAIMED_TIME_TO_LIVE = 60000;
    private static final long UNCLAIMED_INTERVAL = 60000;
    private static final String NAME_TEMPLATE = "%s.%s.%s.asyncReplies";
    protected String name;
    protected MessageSource replyMessageSource;
    private Scheduler scheduler;
    private NotificationDispatcher notificationFirer;
    private AsyncReplyMonitoringRunnable replyRunnable;
    protected ObjectStore store;
    protected volatile long timeout = -1;
    protected volatile boolean failOnTimeout = true;
    private final Processor internalAsyncReplyMessageProcessor = new InternalAsyncReplyMessageProcessor();
    protected final Map<String, RequestReplyLatch> locks = new ConcurrentHashMap();
    private String storePrefix = "";
    protected final ConcurrentMap<String, InternalEvent> responseEvents = new ConcurrentHashMap();
    private final Object processedLock = new Object();
    private final BoundedFifoBuffer processed = new BoundedFifoBuffer(50000);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/routing/requestreply/AbstractAsyncRequestReplyRequester$AsyncReplyMonitoringRunnable.class */
    public class AsyncReplyMonitoringRunnable implements Runnable {
        private AsyncReplyMonitoringRunnable() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                List<String> allKeys = AbstractAsyncRequestReplyRequester.this.store.allKeys();
                AbstractAsyncRequestReplyRequester.this.logger.debug("Found " + allKeys.size() + " objects in store");
                for (String str : allKeys) {
                    try {
                        boolean z = false;
                        MultipleRequestReplierEvent multipleRequestReplierEvent = (MultipleRequestReplierEvent) AbstractAsyncRequestReplyRequester.this.store.retrieve(str);
                        if (AbstractAsyncRequestReplyRequester.this.isAlreadyProcessed(new ProcessedEvents(str, EndReason.FINISHED_BY_TIMEOUT))) {
                            z = true;
                            InternalEvent event = multipleRequestReplierEvent.getEvent();
                            if (AbstractAsyncRequestReplyRequester.this.logger.isDebugEnabled()) {
                                AbstractAsyncRequestReplyRequester.this.logger.debug("An event was received for an event group that has already been processed, this is because the async-reply timed out. GroupCorrelation Id is: " + str + ". Dropping event");
                            }
                            AbstractAsyncRequestReplyRequester.this.notificationFirer.dispatch(new RoutingNotification(event.getMessage(), event.getContext().getOriginatingLocation().getComponentIdentifier().getIdentifier().getNamespace(), RoutingNotification.MISSED_ASYNC_REPLY));
                        } else {
                            RequestReplyLatch requestReplyLatch = AbstractAsyncRequestReplyRequester.this.locks.get(str);
                            if (requestReplyLatch != null) {
                                if (AbstractAsyncRequestReplyRequester.this.responseEvents.putIfAbsent(str, AbstractAsyncRequestReplyRequester.this.retrieveEvent(str)) != null) {
                                    throw new IllegalStateException("Detected duplicate result message with id: " + str);
                                    break;
                                }
                                if (!requestReplyLatch.isSequenceEvent()) {
                                    AbstractAsyncRequestReplyRequester.this.addProcessed(new ProcessedEvents(str));
                                    z = true;
                                } else if (requestReplyLatch.isLastEvent()) {
                                    AbstractAsyncRequestReplyRequester.this.addProcessed(new ProcessedEvents(str));
                                    z = true;
                                }
                                requestReplyLatch.countDown();
                                multipleRequestReplierEvent.removeEvent();
                            }
                        }
                        if (z) {
                            AbstractAsyncRequestReplyRequester.this.store.remove(str);
                        }
                    } catch (Exception e) {
                        AbstractAsyncRequestReplyRequester.this.logger.debug("Error processing async replies", (Throwable) e);
                    }
                }
            } catch (Exception e2) {
                AbstractAsyncRequestReplyRequester.this.logger.debug("Error processing async replies", (Throwable) e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/routing/requestreply/AbstractAsyncRequestReplyRequester$EndReason.class */
    public enum EndReason {
        PROCESSED,
        FINISHED_BY_TIMEOUT
    }

    /* loaded from: input_file:org/mule/runtime/core/internal/routing/requestreply/AbstractAsyncRequestReplyRequester$InternalAsyncReplyMessageProcessor.class */
    class InternalAsyncReplyMessageProcessor extends AbstractAnnotatedObject implements Processor {
        InternalAsyncReplyMessageProcessor() {
        }

        @Override // org.mule.runtime.core.api.processor.Processor
        public InternalEvent process(InternalEvent internalEvent) throws MuleException {
            String asyncReplyCorrelationId = AbstractAsyncRequestReplyRequester.this.getAsyncReplyCorrelationId(internalEvent);
            RequestReplyLatch requestReplyLatch = AbstractAsyncRequestReplyRequester.this.locks.get(asyncReplyCorrelationId);
            if (requestReplyLatch != null && requestReplyLatch.isSequenceEvent() && AbstractAsyncRequestReplyRequester.this.store.contains(asyncReplyCorrelationId)) {
                ((MultipleRequestReplierEvent) AbstractAsyncRequestReplyRequester.this.store.retrieve(asyncReplyCorrelationId)).addEvent(internalEvent);
            } else {
                MultipleRequestReplierEvent multipleRequestReplierEvent = new MultipleRequestReplierEvent();
                multipleRequestReplierEvent.addEvent(internalEvent);
                AbstractAsyncRequestReplyRequester.this.store.store(asyncReplyCorrelationId, multipleRequestReplierEvent);
            }
            AbstractAsyncRequestReplyRequester.this.replyRunnable.run();
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/routing/requestreply/AbstractAsyncRequestReplyRequester$ProcessedEvents.class */
    public class ProcessedEvents {
        private String id;
        private EndReason endReason;

        private ProcessedEvents(String str, EndReason endReason) {
            this.id = str;
            this.endReason = endReason;
        }

        private ProcessedEvents(String str) {
            this.id = str;
            this.endReason = EndReason.PROCESSED;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            ProcessedEvents processedEvents = (ProcessedEvents) obj;
            return this.id.equals(processedEvents.id) && this.endReason == processedEvents.endReason;
        }

        public int hashCode() {
            return (31 * this.id.hashCode()) + this.endReason.hashCode();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/core/internal/routing/requestreply/AbstractAsyncRequestReplyRequester$RequestReplyLatch.class */
    public class RequestReplyLatch {
        private final int groupSize;
        private final int correlationSequence;
        private final Latch latch;

        RequestReplyLatch(int i, int i2) {
            this.latch = AbstractAsyncRequestReplyRequester.this.createEventLock();
            this.groupSize = i;
            this.correlationSequence = i2;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isSequenceEvent() {
            return this.groupSize != -1;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void countDown() {
            this.latch.countDown();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isLastEvent() {
            return this.groupSize == this.correlationSequence;
        }
    }

    @Override // org.mule.runtime.core.api.processor.Processor
    public InternalEvent process(InternalEvent internalEvent) throws MuleException {
        if (this.replyMessageSource == null) {
            return processNext(internalEvent);
        }
        addLock(internalEvent);
        sendAsyncRequest(internalEvent);
        InternalEvent receiveAsyncReply = receiveAsyncReply(internalEvent);
        if (receiveAsyncReply != null) {
            if (((InternalMessage) receiveAsyncReply.getMessage()).getInboundProperty(MuleProperties.MULE_SESSION_PROPERTY) != null) {
                internalEvent.getSession().merge(receiveAsyncReply.getSession());
            }
            receiveAsyncReply = InternalEvent.builder(internalEvent).message(receiveAsyncReply.getMessage()).build();
            InternalEvent.setCurrentEvent(receiveAsyncReply);
        }
        return receiveAsyncReply;
    }

    private void addLock(InternalEvent internalEvent) {
        this.locks.put(getAsyncReplyCorrelationId(internalEvent), new RequestReplyLatch(((Integer) internalEvent.getGroupCorrelation().map(groupCorrelation -> {
            return Integer.valueOf(groupCorrelation.getGroupSize().orElse(-1));
        }).orElse(-1)).intValue(), ((Integer) internalEvent.getGroupCorrelation().map(groupCorrelation2 -> {
            return Integer.valueOf(groupCorrelation2.getSequence());
        }).orElse(-1)).intValue()));
    }

    private Latch getLatch(String str) {
        return this.locks.get(str).latch;
    }

    protected Latch createEventLock() {
        return new Latch();
    }

    public void setTimeout(long j) {
        this.timeout = j;
    }

    public void setFailOnTimeout(boolean z) {
        this.failOnTimeout = z;
    }

    @Override // org.mule.runtime.core.api.processor.RequestReplyRequesterMessageProcessor
    public void setReplySource(MessageSource messageSource) {
        verifyReplyMessageSource(messageSource);
        this.replyMessageSource = messageSource;
        messageSource.setListener(this.internalAsyncReplyMessageProcessor);
    }

    @Override // org.mule.runtime.api.lifecycle.Initialisable
    public void initialise() throws InitialisationException {
        this.name = String.format(NAME_TEMPLATE, this.storePrefix, this.muleContext.getConfiguration().getId(), getLocation().getRootContainerName());
        this.store = ((ObjectStoreManager) this.muleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).createObjectStore(this.name, ObjectStoreSettings.builder().persistent(false).maxEntries(50000).entryTtl(60000L).expirationInterval(60000L).build());
        try {
            this.notificationFirer = (NotificationDispatcher) this.muleContext.getRegistry().lookupObject(NotificationDispatcher.class);
        } catch (RegistrationException e) {
            throw new InitialisationException(e, this);
        }
    }

    @Override // org.mule.runtime.api.lifecycle.Startable
    public void start() throws MuleException {
        this.scheduler = this.muleContext.getSchedulerService().customScheduler(this.muleContext.getSchedulerBaseConfig().withName(this.name).withMaxConcurrentTasks(1).withShutdownTimeout(0L, TimeUnit.MILLISECONDS));
        this.replyRunnable = new AsyncReplyMonitoringRunnable();
        this.scheduler.scheduleWithFixedDelay(this.replyRunnable, 0L, 100L, TimeUnit.MILLISECONDS);
    }

    @Override // org.mule.runtime.api.lifecycle.Stoppable
    public void stop() throws MuleException {
        if (this.scheduler != null) {
            this.scheduler.stop();
        }
    }

    @Override // org.mule.runtime.api.lifecycle.Disposable
    public void dispose() {
        if (this.store != null) {
            try {
                ((ObjectStoreManager) this.muleContext.getRegistry().get(MuleProperties.OBJECT_STORE_MANAGER)).disposeStore(this.name);
            } catch (ObjectStoreException e) {
                this.logger.debug("Exception disposing of store", (Throwable) e);
            }
        }
    }

    public void setStorePrefix(String str) {
        this.storePrefix = str;
    }

    protected void verifyReplyMessageSource(MessageSource messageSource) {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String getAsyncReplyCorrelationId(InternalEvent internalEvent) {
        return internalEvent.getContext().getCorrelationId();
    }

    protected void sendAsyncRequest(InternalEvent internalEvent) throws MuleException {
        processNext(internalEvent);
    }

    private InternalEvent receiveAsyncReply(InternalEvent internalEvent) throws MuleException {
        InternalEvent remove;
        String asyncReplyCorrelationId = getAsyncReplyCorrelationId(internalEvent);
        System.out.println("receiveAsyncReply: " + asyncReplyCorrelationId);
        Latch latch = getLatch(asyncReplyCorrelationId);
        boolean z = false;
        try {
            if (this.logger.isDebugEnabled()) {
                this.logger.debug("Waiting for async reply message with id: " + asyncReplyCorrelationId);
            }
            if (this.timeout <= 0) {
                latch.await();
                z = true;
            } else {
                z = latch.await(this.timeout, TimeUnit.MILLISECONDS);
            }
            if (!z) {
                postLatchAwait(asyncReplyCorrelationId);
                latch.await(1000L, TimeUnit.MILLISECONDS);
                z = latch.getCount() == 0;
            }
            this.locks.remove(asyncReplyCorrelationId);
            remove = this.responseEvents.remove(asyncReplyCorrelationId);
            if (0 != 0) {
                Thread.currentThread().interrupt();
                return null;
            }
        } catch (InterruptedException e) {
            this.locks.remove(asyncReplyCorrelationId);
            remove = this.responseEvents.remove(asyncReplyCorrelationId);
            if (1 != 0) {
                Thread.currentThread().interrupt();
                return null;
            }
        } catch (Throwable th) {
            this.locks.remove(asyncReplyCorrelationId);
            this.responseEvents.remove(asyncReplyCorrelationId);
            if (0 == 0) {
                throw th;
            }
            Thread.currentThread().interrupt();
            return null;
        }
        if (z) {
            if (remove == null) {
                throw new IllegalStateException("Response MuleEvent is null");
            }
            InternalEvent.setCurrentEvent(remove);
            return remove;
        }
        addProcessed(new ProcessedEvents(asyncReplyCorrelationId, EndReason.FINISHED_BY_TIMEOUT));
        if (!this.failOnTimeout) {
            return null;
        }
        this.notificationFirer.dispatch(new RoutingNotification(internalEvent.getMessage(), null, RoutingNotification.ASYNC_REPLY_TIMEOUT));
        throw new ResponseTimeoutException(CoreMessages.responseTimedOutWaitingForId((int) this.timeout, asyncReplyCorrelationId), null);
    }

    private void postLatchAwait(String str) throws MessagingException {
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addProcessed(Object obj) {
        synchronized (this.processedLock) {
            if (this.processed.isFull()) {
                this.processed.remove();
            }
            this.processed.add(obj);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isAlreadyProcessed(Object obj) {
        boolean contains;
        synchronized (this.processedLock) {
            contains = this.processed.contains(obj);
        }
        return contains;
    }

    @Override // org.mule.runtime.core.privileged.processor.AbstractInterceptingMessageProcessorBase
    public String toString() {
        return ObjectUtils.toString(this);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public InternalEvent retrieveEvent(String str) throws ObjectStoreException, DefaultMuleException {
        InternalEvent event = ((MultipleRequestReplierEvent) this.store.retrieve(str)).getEvent();
        if (event.getMuleContext() == null) {
            try {
                DeserializationPostInitialisable.Implementation.init(event, this.muleContext);
            } catch (Exception e) {
                throw new DefaultMuleException(e);
            }
        }
        return event;
    }

    @Override // org.mule.runtime.core.api.processor.ReactiveProcessor
    public ReactiveProcessor.ProcessingType getProcessingType() {
        return ReactiveProcessor.ProcessingType.BLOCKING;
    }
}
