package com.sap.cds.services.impl.outbox.persistence;

import com.sap.cds.impl.parser.token.Jsonizer;
import com.sap.cds.ql.Insert;
import com.sap.cds.services.changeset.ChangeSetContext;
import com.sap.cds.services.changeset.ChangeSetListener;
import com.sap.cds.services.environment.CdsProperties;
import com.sap.cds.services.impl.outbox.AbstractOutboxService;
import com.sap.cds.services.impl.outbox.Messages;
import com.sap.cds.services.impl.outbox.Messages_;
import com.sap.cds.services.impl.outbox.persistence.collectors.PartitionCollector;
import com.sap.cds.services.impl.utils.CdsServiceUtils;
import com.sap.cds.services.outbox.OutboxMessageEventContext;
import com.sap.cds.services.runtime.CdsRuntime;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/sap/cds/services/impl/outbox/persistence/PersistentOutbox.class */
public class PersistentOutbox extends AbstractOutboxService {
    private static final Logger LOG = LoggerFactory.getLogger(PersistentOutbox.class);
    public static final String ATTR_EVENT = "event";
    public static final String ATTR_MESSAGE = "message";
    private final TelemetryData telemetryData;
    private final PartitionCollector collector;
    private final CdsProperties.Outbox.OutboxServiceConfig config;
    private Map<ChangeSetContext, Long> changeSetContextCache;

    public PersistentOutbox(String str, CdsProperties.Outbox.OutboxServiceConfig outboxServiceConfig, CdsRuntime cdsRuntime, Supplier<List<String>> supplier) {
        super(str, cdsRuntime);
        this.changeSetContextCache = new ConcurrentHashMap();
        this.config = outboxServiceConfig;
        this.telemetryData = outboxServiceConfig.isObservable() ? new TelemetryDataImpl(str, outboxServiceConfig.getMaxAttempts()) : TelemetryData.NOOP;
        this.collector = new PartitionCollector(cdsRuntime, this, outboxServiceConfig, supplier, this.telemetryData);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<OutboxStatistics> getStatistics() {
        return this.telemetryData.getStatistics();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void init() {
        if (this.config.isStartCollector()) {
            start();
        }
    }

    public void start() {
        this.collector.start();
    }

    public void stop() {
        try {
            stop(0L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public void stop(long j) throws InterruptedException {
        this.collector.stop(j);
    }

    public boolean isCollectorRunning() {
        return this.collector.isRunning();
    }

    @Override // com.sap.cds.services.impl.outbox.AbstractOutboxService
    protected void submit(OutboxMessageEventContext outboxMessageEventContext) {
        LOG.debug("Submitting outbox message for target '{}' with event '{}'.", getName(), outboxMessageEventContext.getEvent());
        persist(outboxMessageEventContext);
        LOG.debug("Stored outbox message for target '{}' with event '{}'.", getName(), outboxMessageEventContext.getEvent());
        final ChangeSetContext changeSetContext = outboxMessageEventContext.getChangeSetContext();
        this.changeSetContextCache.computeIfPresent(changeSetContext, (changeSetContext2, l) -> {
            return Long.valueOf(l.longValue() + 1);
        });
        if (this.changeSetContextCache.containsKey(changeSetContext)) {
            return;
        }
        this.changeSetContextCache.put(changeSetContext, 1L);
        final String tenant = outboxMessageEventContext.getUserInfo().getTenant();
        changeSetContext.register(new ChangeSetListener() { // from class: com.sap.cds.services.impl.outbox.persistence.PersistentOutbox.1
            public void afterClose(boolean z) {
                long longValue = PersistentOutbox.this.changeSetContextCache.remove(changeSetContext).longValue();
                if (z) {
                    PersistentOutbox.this.telemetryData.recordIncomingMessages(tenant, longValue);
                    if (PersistentOutbox.this.config.getTriggerSchedule().isEnabled().booleanValue()) {
                        PersistentOutbox.this.collector.unpause();
                    }
                }
            }
        });
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [com.sap.cds.services.impl.outbox.Messages, java.util.Map] */
    private void persist(OutboxMessageEventContext outboxMessageEventContext) {
        ?? create = Messages.create();
        HashMap hashMap = new HashMap();
        hashMap.put("message", outboxMessageEventContext.getMessage());
        hashMap.put("event", outboxMessageEventContext.getEvent());
        create.setMsg(Jsonizer.json(hashMap));
        create.setTarget(getName());
        create.setTimestamp(outboxMessageEventContext.getTimestamp());
        CdsServiceUtils.getDefaultPersistenceService(outboxMessageEventContext).run(Insert.into(Messages_.class).entry((Map) create));
    }
}
