package org.mule.runtime.module.extension.internal.runtime.source.poll;

import java.io.Serializable;
import java.util.Comparator;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.function.Consumer;
import javax.inject.Inject;
import org.mule.runtime.api.component.location.ComponentLocation;
import org.mule.runtime.api.connection.ConnectionException;
import org.mule.runtime.api.exception.MuleException;
import org.mule.runtime.api.exception.MuleRuntimeException;
import org.mule.runtime.api.i18n.I18nMessageFactory;
import org.mule.runtime.api.lock.LockFactory;
import org.mule.runtime.api.scheduler.SchedulerConfig;
import org.mule.runtime.api.scheduler.SchedulerService;
import org.mule.runtime.api.store.ObjectStore;
import org.mule.runtime.api.store.ObjectStoreException;
import org.mule.runtime.api.store.ObjectStoreManager;
import org.mule.runtime.api.store.ObjectStoreSettings;
import org.mule.runtime.api.util.Preconditions;
import org.mule.runtime.core.api.event.CoreEvent;
import org.mule.runtime.core.api.source.scheduler.Scheduler;
import org.mule.runtime.core.api.util.func.CheckedRunnable;
import org.mule.runtime.extension.api.runtime.operation.Result;
import org.mule.runtime.extension.api.runtime.source.PollContext;
import org.mule.runtime.extension.api.runtime.source.PollingSource;
import org.mule.runtime.extension.api.runtime.source.SourceCallback;
import org.mule.runtime.extension.api.runtime.source.SourceCallbackContext;
import org.mule.runtime.module.extension.api.loader.java.type.WithAlias;
import org.mule.runtime.module.extension.internal.runtime.source.SourceWrapper;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/mule/runtime/module/extension/internal/runtime/source/poll/PollingSourceWrapper.class */
public class PollingSourceWrapper<T, A> extends SourceWrapper<T, A> {
    private static final Logger LOGGER = LoggerFactory.getLogger(PollingSourceWrapper.class);
    private static final String ITEM_RELEASER_CTX_VAR = "itemReleaser";
    private static final String WATERMARK_OS_KEY = "watermark";
    private final PollingSource<T, A> delegate;
    private final Scheduler scheduler;

    @Inject
    private LockFactory lockFactory;

    @Inject
    private ObjectStoreManager objectStoreManager;

    @Inject
    private SchedulerService schedulerService;
    private ObjectStore<Serializable> watermarkObjectStore;
    private ObjectStore<Serializable> inflightIdsObjectStore;
    private ObjectStore<Serializable> recentlyProcessedIds;
    private ComponentLocation componentLocation;
    private String flowName;
    private String keyPrefix;
    private final AtomicBoolean stopRequested;
    private org.mule.runtime.api.scheduler.Scheduler executor;

    /* loaded from: input_file:org/mule/runtime/module/extension/internal/runtime/source/poll/PollingSourceWrapper$DefaultPollContext.class */
    private class DefaultPollContext implements PollContext<T, A> {
        private final SourceCallback<T, A> sourceCallback;
        private Serializable currentWatermark;
        private Serializable updatedWatermark;
        private Comparator<Serializable> watermarkComparator;

        private DefaultPollContext(SourceCallback<T, A> sourceCallback, Serializable serializable) {
            this.updatedWatermark = null;
            this.watermarkComparator = null;
            this.sourceCallback = sourceCallback;
            this.currentWatermark = serializable;
        }

        public PollContext.PollItemStatus accept(Consumer<PollContext.PollItem<T, A>> consumer) {
            PollContext.PollItemStatus pollItemStatus;
            SourceCallbackContext createContext = this.sourceCallback.createContext();
            PollingSourceWrapper<T, A>.DefaultPollItem defaultPollItem = new DefaultPollItem(createContext);
            consumer.accept(defaultPollItem);
            defaultPollItem.validate();
            if (!PollingSourceWrapper.this.acquireItem(defaultPollItem, createContext)) {
                pollItemStatus = PollContext.PollItemStatus.ALREADY_IN_PROCESS;
            } else if (!passesWatermark(defaultPollItem)) {
                pollItemStatus = PollContext.PollItemStatus.FILTERED_BY_WATERMARK;
            } else if (PollingSourceWrapper.this.isRequestedToStop()) {
                pollItemStatus = PollContext.PollItemStatus.SOURCE_STOPPING;
            } else {
                this.sourceCallback.handle(defaultPollItem.getResult(), createContext);
                pollItemStatus = PollContext.PollItemStatus.ACCEPTED;
            }
            if (pollItemStatus != PollContext.PollItemStatus.ACCEPTED) {
                PollingSourceWrapper.this.release(defaultPollItem.getResult(), createContext);
            }
            return pollItemStatus;
        }

        public boolean isSourceStopping() {
            return PollingSourceWrapper.this.isRequestedToStop();
        }

        public Optional<Serializable> getWatermark() {
            return Optional.ofNullable(this.currentWatermark);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void setWatermarkComparator(Comparator<? extends Serializable> comparator) {
            Preconditions.checkArgument(comparator != 0, "Cannot set a null watermark comparator");
            this.watermarkComparator = comparator;
        }

        public void onConnectionException(ConnectionException connectionException) {
            this.sourceCallback.onConnectionException(connectionException);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Optional<Serializable> getUpdatedWatermark() {
            return Optional.ofNullable(this.updatedWatermark);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Comparator<Serializable> getWatermarkComparator() {
            return this.watermarkComparator;
        }

        private boolean passesWatermark(PollingSourceWrapper<T, A>.DefaultPollItem defaultPollItem) {
            Serializable serializable = (Serializable) defaultPollItem.getWatermark().orElse(null);
            if (serializable == null) {
                return true;
            }
            String str = (String) defaultPollItem.getItemId().orElse(null);
            boolean z = true;
            if (this.currentWatermark == null && this.updatedWatermark == null) {
                this.updatedWatermark = serializable;
            } else {
                int compareWatermarks = PollingSourceWrapper.this.compareWatermarks(this.currentWatermark != null ? this.currentWatermark : this.updatedWatermark, serializable, this.watermarkComparator);
                if (compareWatermarks < 0) {
                    this.updatedWatermark = serializable;
                } else if (compareWatermarks == 0 && defaultPollItem.getItemId().isPresent()) {
                    try {
                        z = !PollingSourceWrapper.this.recentlyProcessedIds.contains(str);
                    } catch (ObjectStoreException e) {
                        throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage("An error occurred while checking the watermark status for Item with ID [%s]", new Object[]{str}), e);
                    }
                } else {
                    z = false;
                }
            }
            if (z) {
                if (str != null) {
                    try {
                        if (!PollingSourceWrapper.this.recentlyProcessedIds.contains(str)) {
                            PollingSourceWrapper.this.recentlyProcessedIds.store(str, str);
                        }
                    } catch (ObjectStoreException e2) {
                        throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage("An error occurred while updating the watermark for Item with ID [%s]", new Object[]{str}), e2);
                    }
                }
            } else if (PollingSourceWrapper.LOGGER.isDebugEnabled()) {
                PollingSourceWrapper.LOGGER.debug("Source in flow '{}' is skipping item '{}' because it was rejected by the watermark", PollingSourceWrapper.this.flowName, (String) defaultPollItem.getItemId().orElseGet(() -> {
                    return (String) defaultPollItem.getResult().getAttributes().map((v0) -> {
                        return v0.toString();
                    }).orElse(WithAlias.EMPTY);
                }));
            }
            return z;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/module/extension/internal/runtime/source/poll/PollingSourceWrapper$DefaultPollItem.class */
    public class DefaultPollItem implements PollContext.PollItem<T, A> {
        private final SourceCallbackContext sourceCallbackContext;
        private Result<T, A> result;
        private Serializable watermark;
        private String itemId;

        private DefaultPollItem(SourceCallbackContext sourceCallbackContext) {
            this.sourceCallbackContext = sourceCallbackContext;
        }

        public SourceCallbackContext getSourceCallbackContext() {
            return this.sourceCallbackContext;
        }

        public PollContext.PollItem<T, A> setResult(Result<T, A> result) {
            Preconditions.checkArgument(result != null, "Cannot set a null Result");
            this.result = result;
            return this;
        }

        public PollContext.PollItem<T, A> setWatermark(Serializable serializable) {
            Preconditions.checkArgument(serializable != null, "Cannot set a null watermark");
            this.watermark = serializable;
            return this;
        }

        public PollContext.PollItem<T, A> setId(String str) {
            Preconditions.checkArgument(str != null, "Cannot set a null id");
            this.itemId = str;
            return this;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Optional<Serializable> getWatermark() {
            return Optional.ofNullable(this.watermark);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Optional<String> getItemId() {
            return Optional.ofNullable(this.itemId);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Result<T, A> getResult() {
            return this.result;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void validate() {
            if (this.result == null) {
                throw new IllegalStateException(String.format("Missing item Result. Source in flow '%s' pushed an item with ID '%s' without configuring its Result", PollingSourceWrapper.this.flowName, this.itemId));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/mule/runtime/module/extension/internal/runtime/source/poll/PollingSourceWrapper$ItemReleaser.class */
    public class ItemReleaser {
        private final String id;
        private final Lock lock;

        private ItemReleaser(String str, Lock lock) {
            this.id = str;
            this.lock = lock;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void release() {
            try {
                if (PollingSourceWrapper.this.inflightIdsObjectStore.contains(this.id)) {
                    PollingSourceWrapper.this.inflightIdsObjectStore.remove(this.id);
                }
            } catch (ObjectStoreException e) {
                PollingSourceWrapper.LOGGER.error(String.format("Could not untrack item '%s' in source at flow '%s'. %s", this.id, PollingSourceWrapper.this.flowName, e.getMessage()), e);
            } finally {
                this.lock.unlock();
            }
        }
    }

    public PollingSourceWrapper(PollingSource<T, A> pollingSource, Scheduler scheduler) {
        super(pollingSource);
        this.stopRequested = new AtomicBoolean(false);
        this.delegate = pollingSource;
        this.scheduler = scheduler;
    }

    public void onStart(SourceCallback<T, A> sourceCallback) throws MuleException {
        this.delegate.onStart(sourceCallback);
        this.flowName = this.componentLocation.getRootContainerName();
        this.keyPrefix = "_pollingSource_" + this.flowName + "/";
        this.inflightIdsObjectStore = this.objectStoreManager.getOrCreateObjectStore(formatKey("inflight-ids"), ObjectStoreSettings.builder().persistent(false).maxEntries(1000).entryTtl(60000L).expirationInterval(20000L).build());
        this.recentlyProcessedIds = this.objectStoreManager.getOrCreateObjectStore(formatKey("recently-processed-ids"), ObjectStoreSettings.builder().persistent(true).maxEntries(1000).entryTtl(60000L).expirationInterval(20000L).build());
        this.watermarkObjectStore = this.objectStoreManager.getOrCreateObjectStore(formatKey(WATERMARK_OS_KEY), ObjectStoreSettings.unmanagedPersistent());
        this.executor = this.schedulerService.customScheduler(SchedulerConfig.config().withMaxConcurrentTasks(1).withWaitAllowed(true).withName(formatKey("executor")));
        this.stopRequested.set(false);
        this.scheduler.schedule(this.executor, () -> {
            poll(sourceCallback);
        });
    }

    private String formatKey(String str) {
        return this.keyPrefix + str;
    }

    public void onStop() {
        this.stopRequested.set(true);
        shutdownScheduler();
        try {
            this.delegate.onStop();
        } catch (Throwable th) {
            LOGGER.error(String.format("Found error while stopping source at location '%s'. %s", this.flowName, th.getMessage()), th);
        }
    }

    @Override // org.mule.runtime.module.extension.internal.runtime.source.SourceWrapper
    public Publisher<Void> onTerminate(CoreEvent coreEvent, Map<String, Object> map, SourceCallbackContext sourceCallbackContext) {
        return releaseOnCallback(sourceCallbackContext);
    }

    @Override // org.mule.runtime.module.extension.internal.runtime.source.SourceWrapper
    public Publisher<Void> onBackPressure(CoreEvent coreEvent, Map<String, Object> map, SourceCallbackContext sourceCallbackContext) {
        return releaseOnCallback(sourceCallbackContext);
    }

    private Publisher<Void> releaseOnCallback(SourceCallbackContext sourceCallbackContext) {
        return Mono.fromRunnable(() -> {
            release(sourceCallbackContext);
        });
    }

    private void poll(SourceCallback<T, A> sourceCallback) {
        if (isRequestedToStop()) {
            return;
        }
        withWatermarkLock(() -> {
            DefaultPollContext defaultPollContext = new DefaultPollContext(sourceCallback, getCurrentWatermark());
            try {
                this.delegate.poll(defaultPollContext);
                defaultPollContext.getUpdatedWatermark().ifPresent(serializable -> {
                    updateWatermark(serializable, defaultPollContext.getWatermarkComparator());
                });
            } catch (Throwable th) {
                LOGGER.error(String.format("Found exception trying to process item on source at flow '%s'. %s", this.flowName, th.getMessage()), th);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int compareWatermarks(Serializable serializable, Serializable serializable2, Comparator comparator) throws IllegalArgumentException {
        if (comparator == null) {
            if (!(serializable instanceof Serializable) || !(serializable2 instanceof Serializable)) {
                throw new IllegalStateException(String.format("Non comparable watermark values [%s, %s] were provided on source at flow '%s'. Use comparable values or set a custom comparator. Watermark not updated.", serializable, serializable2, this.flowName));
            }
            comparator = Comparator.naturalOrder();
        }
        return comparator.compare(serializable, serializable2);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void release(Result<T, A> result, SourceCallbackContext sourceCallbackContext) {
        try {
            this.delegate.onRejectedItem(result, sourceCallbackContext);
        } finally {
            release(sourceCallbackContext);
        }
    }

    private void release(SourceCallbackContext sourceCallbackContext) {
        sourceCallbackContext.getVariable(ITEM_RELEASER_CTX_VAR).ifPresent(obj -> {
            ((ItemReleaser) obj).release();
        });
    }

    private void withWatermarkLock(CheckedRunnable checkedRunnable) {
        Lock watermarkLock = getWatermarkLock();
        watermarkLock.lock();
        try {
            checkedRunnable.run();
        } finally {
            watermarkLock.unlock();
        }
    }

    private Lock getWatermarkLock() {
        return this.lockFactory.createLock(formatKey(WATERMARK_OS_KEY));
    }

    private void updateWatermark(Serializable serializable, Comparator comparator) {
        try {
            if (this.watermarkObjectStore.contains(WATERMARK_OS_KEY)) {
                if (compareWatermarks(this.watermarkObjectStore.retrieve(WATERMARK_OS_KEY), serializable, comparator) >= 0) {
                    return;
                }
                this.watermarkObjectStore.remove(WATERMARK_OS_KEY);
                this.recentlyProcessedIds.clear();
            }
            this.watermarkObjectStore.store(WATERMARK_OS_KEY, serializable);
        } catch (ObjectStoreException e) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Failed to update watermark value for message source at location '%s'. %s", this.flowName, e.getMessage())), e);
        }
    }

    private Serializable getCurrentWatermark() {
        try {
            if (this.watermarkObjectStore.contains(WATERMARK_OS_KEY)) {
                return this.watermarkObjectStore.retrieve(WATERMARK_OS_KEY);
            }
            return null;
        } catch (ObjectStoreException e) {
            throw new MuleRuntimeException(I18nMessageFactory.createStaticMessage(String.format("Failed to fetch watermark for Message source at location '%s'. %s", this.flowName, e.getMessage())), e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean acquireItem(PollingSourceWrapper<T, A>.DefaultPollItem defaultPollItem, SourceCallbackContext sourceCallbackContext) {
        if (!defaultPollItem.getItemId().isPresent()) {
            return true;
        }
        String str = (String) defaultPollItem.getItemId().get();
        Lock createLock = this.lockFactory.createLock(this.flowName + "/" + str);
        if (!createLock.tryLock()) {
            if (!LOGGER.isDebugEnabled()) {
                return false;
            }
            LOGGER.debug("Source at flow '{}' is skipping processing of item '{}' because another thread or node already has a mule lock on it", this.flowName, str);
            return false;
        }
        try {
            if (this.inflightIdsObjectStore.contains(str)) {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Source at flow '{}' polled item '{}', but skipping it since it is already being processed in another thread or node", this.flowName, str);
                }
                createLock.unlock();
                return false;
            }
            try {
                this.inflightIdsObjectStore.store(str, str);
                sourceCallbackContext.addVariable(ITEM_RELEASER_CTX_VAR, new ItemReleaser(str, createLock));
                return true;
            } catch (ObjectStoreException e) {
                createLock.unlock();
                LOGGER.error(String.format("Flow at source '%s' could not track item '%s' as being processed. %s", this.flowName, str, e.getMessage()), e);
                return false;
            }
        } catch (Exception e2) {
            createLock.unlock();
            LOGGER.error(String.format("Could not guarantee idempotency for item '%s' for source at flow '%s'. '%s", str, this.flowName, e2.getMessage()), e2);
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isRequestedToStop() {
        return this.stopRequested.get() || Thread.currentThread().isInterrupted();
    }

    private void shutdownScheduler() {
        if (this.executor != null) {
            this.executor.stop();
        }
    }
}
