package io.quarkus.smallrye.reactivemessaging.kafka;

import io.quarkus.vertx.core.runtime.context.VertxContextSafetyToggle;
import io.smallrye.common.annotation.Identifier;
import io.smallrye.common.vertx.VertxContext;
import io.smallrye.mutiny.Uni;
import io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration;
import io.smallrye.reactive.messaging.kafka.KafkaConsumer;
import io.smallrye.reactive.messaging.kafka.commit.CheckpointStateStore;
import io.smallrye.reactive.messaging.kafka.commit.ProcessingState;
import io.vertx.core.Context;
import io.vertx.mutiny.core.Vertx;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.hibernate.reactive.mutiny.Mutiny;

/* loaded from: input_file:io/quarkus/smallrye/reactivemessaging/kafka/HibernateReactiveStateStore.class */
public class HibernateReactiveStateStore implements CheckpointStateStore {
    public static final String HIBERNATE_REACTIVE_STATE_STORE = "quarkus-hibernate-reactive";
    private final String consumerGroupId;
    private final Mutiny.SessionFactory sf;
    private final Class<? extends CheckpointEntity> stateType;

    @ApplicationScoped
    @Identifier(HibernateReactiveStateStore.HIBERNATE_REACTIVE_STATE_STORE)
    /* loaded from: input_file:io/quarkus/smallrye/reactivemessaging/kafka/HibernateReactiveStateStore$Factory.class */
    public static class Factory implements CheckpointStateStore.Factory {

        @Inject
        Mutiny.SessionFactory sf;

        public CheckpointStateStore create(KafkaConnectorIncomingConfiguration kafkaConnectorIncomingConfiguration, Vertx vertx, KafkaConsumer<?, ?> kafkaConsumer, Class<?> cls) {
            String str = (String) kafkaConsumer.configuration().get("group.id");
            if (CheckpointEntity.class.isAssignableFrom(cls)) {
                return new HibernateReactiveStateStore(str, this.sf, cls);
            }
            throw new IllegalArgumentException("State type needs to extend `CheckpointEntity`");
        }
    }

    public HibernateReactiveStateStore(String str, Mutiny.SessionFactory sessionFactory, Class<? extends CheckpointEntity> cls) {
        this.consumerGroupId = str;
        this.sf = sessionFactory;
        this.stateType = cls;
    }

    public Uni<Map<TopicPartition, ProcessingState<?>>> fetchProcessingState(Collection<TopicPartition> collection) {
        return Uni.createFrom().deferred(() -> {
            Object[] array = collection.stream().map(topicPartition -> {
                return new CheckpointEntityId(this.consumerGroupId, topicPartition);
            }).toArray(i -> {
                return new Object[i];
            });
            return this.sf.withTransaction(session -> {
                return session.find(this.stateType, array);
            }).map(list -> {
                return list == null ? Collections.emptyMap() : (Map) list.stream().filter(checkpointEntity -> {
                    return (checkpointEntity == null || CheckpointEntity.topicPartition(checkpointEntity) == null) ? false : true;
                }).collect(Collectors.toMap(CheckpointEntity::topicPartition, checkpointEntity2 -> {
                    return new ProcessingState(checkpointEntity2, checkpointEntity2.offset.longValue());
                }));
            });
        }).runSubscriptionOn(HibernateReactiveStateStore::runOnSafeContext);
    }

    public Uni<Void> persistProcessingState(Map<TopicPartition, ProcessingState<?>> map) {
        return Uni.createFrom().deferred(() -> {
            Object[] array = map.entrySet().stream().filter(entry -> {
                return !ProcessingState.isEmptyOrNull((ProcessingState) entry.getValue());
            }).map(entry2 -> {
                return CheckpointEntity.from((ProcessingState) entry2.getValue(), new CheckpointEntityId(this.consumerGroupId, (TopicPartition) entry2.getKey()));
            }).toArray();
            return this.sf.withTransaction(session -> {
                return session.mergeAll(array);
            });
        }).runSubscriptionOn(HibernateReactiveStateStore::runOnSafeContext);
    }

    private static void runOnSafeContext(Runnable runnable) {
        if (VertxContext.isOnDuplicatedContext()) {
            VertxContextSafetyToggle.setCurrentContextSafe(true);
            runnable.run();
        } else {
            Context createNewDuplicatedContext = VertxContext.createNewDuplicatedContext();
            VertxContextSafetyToggle.setContextSafe(createNewDuplicatedContext, true);
            createNewDuplicatedContext.runOnContext(r3 -> {
                runnable.run();
            });
        }
    }
}
