package com.sap.cds.services.impl.outbox.persistence.collectors;

import com.sap.cds.CdsLockTimeoutException;
import com.sap.cds.Result;
import com.sap.cds.ql.Delete;
import com.sap.cds.ql.Select;
import com.sap.cds.ql.Update;
import com.sap.cds.ql.cqn.CqnPredicate;
import com.sap.cds.reflect.CdsModel;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.impl.outbox.Messages;
import com.sap.cds.services.impl.outbox.Messages_;
import com.sap.cds.services.impl.outbox.persistence.PersistentOutbox;
import com.sap.cds.services.mt.TenantInfo;
import com.sap.cds.services.outbox.OutboxMessageEventContext;
import com.sap.cds.services.outbox.OutboxService;
import com.sap.cds.services.persistence.PersistenceService;
import com.sap.cds.services.runtime.CdsRuntime;
import com.sap.cds.services.utils.outbox.OutboxUtils;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.time.Duration;
import java.time.Instant;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sap/cds/services/impl/outbox/persistence/collectors/PartitionCollector.class */
public class PartitionCollector implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionCollector.class);
    private final CdsRuntime runtime;
    private final PersistenceService db;
    private final OutboxService outboxService;
    private final int chunkSize;
    private final int partition;
    private final Object pauseMonitor = new Object();
    private final AtomicInteger pauseCount = new AtomicInteger(5);
    private volatile boolean pause = false;
    private final long maxPauseMillis;
    private final long emitTimeoutSeconds;
    private final int maxPublishAttempts;
    private final boolean storeLastError;
    private final Supplier<List<TenantInfo>> tenantSupplier;

    public PartitionCollector(CdsRuntime cdsRuntime, PersistentOutbox persistentOutbox, Supplier<List<TenantInfo>> supplier, int i) {
        this.runtime = cdsRuntime;
        this.db = cdsRuntime.getServiceCatalog().getService(PersistenceService.class, "PersistenceService$Default");
        this.outboxService = persistentOutbox;
        this.partition = i;
        CdsProperties.Outbox.Persistent persistent = cdsRuntime.getEnvironment().getCdsProperties().getOutbox().getPersistent();
        this.chunkSize = persistent.getChunkSize();
        this.maxPauseMillis = persistent.getMaxPause().getSeconds() * 1000;
        this.emitTimeoutSeconds = persistent.getEmitTimeout().getSeconds();
        this.maxPublishAttempts = persistent.getMaxAttempts();
        this.storeLastError = persistent.getStoreLastError().isEnabled().booleanValue();
        this.tenantSupplier = supplier;
        if (this.storeLastError) {
            return;
        }
        LOG.debug("Storing errors to the outbox is disabled.");
    }

    @Override // java.lang.Runnable
    public void run() {
        processPartition();
    }

    private void pause() {
        synchronized (this.pauseMonitor) {
            this.pause = true;
            try {
                long pauseMillis = getPauseMillis(this.pauseCount.get(), this.maxPauseMillis);
                LOG.debug("Pausing partition collector {} for {} ms", Integer.valueOf(this.partition), Long.valueOf(pauseMillis));
                this.pauseMonitor.wait(pauseMillis);
            } catch (InterruptedException e) {
                LOG.debug("Partition collector thread '{}' interrupted", Thread.currentThread().getName());
                Thread.currentThread().interrupt();
            }
            this.pause = false;
        }
    }

    public void unpause() {
        this.pauseCount.set(0);
        if (this.pause) {
            synchronized (this.pauseMonitor) {
                if (this.pause) {
                    this.pause = false;
                    this.pauseMonitor.notifyAll();
                    LOG.debug("Notified paused partition collector {}", Integer.valueOf(this.partition));
                }
            }
        }
    }

    private void processPartition() {
        while (!Thread.currentThread().isInterrupted()) {
            try {
                LOG.debug("Executing partition collector {}", Integer.valueOf(this.partition));
                AtomicBoolean atomicBoolean = new AtomicBoolean(true);
                Iterator<TenantInfo> it = this.tenantSupplier.get().iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    TenantInfo next = it.next();
                    try {
                        LOG.debug("Processing tenant '{}' in partition collector {}", next.getTenant(), Integer.valueOf(this.partition));
                        this.runtime.requestContext().clearUser().modifyUser(modifiableUserInfo -> {
                            modifiableUserInfo.setTenant(next.getTenant());
                        }).run(requestContext -> {
                            if (OutboxUtils.hasOutboxModel(requestContext.getModel())) {
                                this.runtime.changeSetContext().run(changeSetContext -> {
                                    Select lock = Select.from(Messages_.class).where(messages_ -> {
                                        return messages_.partition().eq(Integer.valueOf(this.partition)).and(messages_.attempts().lt(Integer.valueOf(this.maxPublishAttempts)), new CqnPredicate[0]);
                                    }).orderBy(new Function[]{messages_2 -> {
                                        return messages_2.timestamp().asc();
                                    }}).limit(this.chunkSize).lock(0);
                                    Result run = this.db.run(lock, new Object[0]);
                                    if (run.rowCount() >= lock.top()) {
                                        atomicBoolean.set(false);
                                    }
                                    if (run.rowCount() > 0) {
                                        Instant now = Instant.now();
                                        for (Messages messages : run.listOf(Messages.class)) {
                                            if (publish(messages)) {
                                                this.db.run(Delete.from(Messages_.class).where(messages_3 -> {
                                                    return messages_3.ID().eq(messages.getId());
                                                }), new Object[0]);
                                            }
                                            if (Duration.between(now, Instant.now()).getSeconds() > this.emitTimeoutSeconds) {
                                                return;
                                            }
                                        }
                                    }
                                });
                            } else {
                                LOG.debug("The outbox model is not available for the tenant '{}'", next.getTenant());
                            }
                        });
                    } catch (Exception e) {
                        if (isLockTimeoutException(e)) {
                            LOG.debug("Partition collector {} timed out waiting for table lock for tenant '{}'", Integer.valueOf(this.partition), next.getTenant());
                            atomicBoolean.set(true);
                            break;
                        }
                        LOG.warn("Exception occurred for tenant '{}' in partition collector {}", new Object[]{next.getTenant(), Integer.valueOf(this.partition), e});
                    }
                }
                if (atomicBoolean.get()) {
                    pause();
                    if (this.pauseCount.get() < 20) {
                        this.pauseCount.addAndGet(2);
                    }
                } else {
                    this.pauseCount.set(0);
                }
            } catch (Throwable th) {
                LOG.warn("Unexpected exception occured in partition collector {}", Integer.valueOf(this.partition), th);
            }
        }
    }

    private boolean publish(Messages messages) {
        LOG.debug("Publishing outbox message with target event '{}'", messages.getTarget());
        OutboxMessageEventContext create = OutboxMessageEventContext.create(messages.getTarget());
        create.setIsInbound(true);
        create.setTimestamp(messages.getTimestamp());
        create.setMessage(messages.getMsg());
        while (true) {
            try {
                this.outboxService.emit(create);
                return true;
            } catch (Throwable th) {
                LOG.warn("Failed to emit Outbox message with id '{}' and target '{}'", new Object[]{messages.getId(), messages.getTarget(), th});
                storeLastError(messages, create.getModel(), th);
                int intValue = messages.getAttempts().intValue();
                if (intValue >= this.maxPublishAttempts) {
                    LOG.warn("Reached maximum number of attempts to emit Outbox message with id '{}' and target '{}'", messages.getId(), messages.getTarget());
                    return false;
                }
                int i = intValue + 1;
                messages.setAttempts(Integer.valueOf(i));
                this.db.run(Update.entity(Messages_.class).data(Messages.ATTEMPTS, Integer.valueOf(i)).where(messages_ -> {
                    return messages_.ID().eq(messages.getId());
                }), new Object[0]);
                try {
                    TimeUnit.MILLISECONDS.sleep(getPauseMillis(i, 2147483647L));
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private void storeLastError(Messages messages, CdsModel cdsModel, Throwable th) {
        if (this.storeLastError) {
            if (!cdsModel.getEntity(Messages_.CDS_NAME).findElement(Messages.LAST_ERROR).isPresent()) {
                LOG.warn("The error can't be stored to outbox. Please migrate the outbox model to the most recent version.");
                return;
            }
            StringWriter stringWriter = new StringWriter();
            th.printStackTrace(new PrintWriter(stringWriter));
            this.db.run(Update.entity(Messages_.class).data(Messages.LAST_ERROR, stringWriter.toString()).where(messages_ -> {
                return messages_.ID().eq(messages.getId());
            }), new Object[0]);
        }
    }

    private static long getPauseMillis(int i, long j) {
        return Math.min(Math.round((Math.pow(2.0d, i) * 1000.0d) + RandomUtils.nextLong(0L, 1001L)), j);
    }

    private static boolean isLockTimeoutException(Throwable th) {
        while (th != null) {
            if (th instanceof CdsLockTimeoutException) {
                return true;
            }
            th = th.getCause();
        }
        return false;
    }
}
