/*
 * Decompiled with CFR 0.152.
 */
package org.hibernate.search.mapper.orm.coordination.databasepolling.impl;

import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.hibernate.search.engine.cfg.ConfigurationPropertySource;
import org.hibernate.search.engine.cfg.spi.ConfigurationProperty;
import org.hibernate.search.engine.cfg.spi.OptionalConfigurationProperty;
import org.hibernate.search.engine.environment.bean.BeanHolder;
import org.hibernate.search.engine.environment.bean.BeanReference;
import org.hibernate.search.engine.environment.bean.BeanResolver;
import org.hibernate.search.mapper.orm.automaticindexing.spi.AutomaticIndexingConfigurationContext;
import org.hibernate.search.mapper.orm.coordination.common.spi.CooordinationStrategy;
import org.hibernate.search.mapper.orm.coordination.common.spi.CoordinationStrategyPreStopContext;
import org.hibernate.search.mapper.orm.coordination.common.spi.CoordinationStrategyStartContext;
import org.hibernate.search.mapper.orm.coordination.databasepolling.impl.DefaultOutboxEventFinder;
import org.hibernate.search.mapper.orm.coordination.databasepolling.impl.EntityIdHashRangeOutboxEventPredicate;
import org.hibernate.search.mapper.orm.coordination.databasepolling.impl.OutboxEventBackgroundProcessor;
import org.hibernate.search.mapper.orm.coordination.databasepolling.impl.OutboxEventFinder;
import org.hibernate.search.mapper.orm.coordination.databasepolling.impl.OutboxEventFinderProvider;
import org.hibernate.search.mapper.orm.coordination.databasepolling.impl.OutboxEventPredicate;
import org.hibernate.search.mapper.orm.coordination.databasepolling.impl.OutboxEventSendingPlan;
import org.hibernate.search.mapper.orm.logging.impl.Log;
import org.hibernate.search.util.common.data.Range;
import org.hibernate.search.util.common.data.impl.RangeCompatibleHashFunction;
import org.hibernate.search.util.common.data.impl.RangeHashTable;
import org.hibernate.search.util.common.impl.Closer;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;

public class DatabasePollingCooordinationStrategy
implements CooordinationStrategy {
    private static final Log log = (Log)LoggerFactory.make(Log.class, (MethodHandles.Lookup)MethodHandles.lookup());
    private static final ConfigurationProperty<Boolean> SHARDS_STATIC = ConfigurationProperty.forKey((String)"shards.static").asBoolean().withDefault((Object)false).build();
    private static final OptionalConfigurationProperty<Integer> SHARDS_TOTAL_COUNT = ConfigurationProperty.forKey((String)"shards.total_count").asInteger().build();
    private static final OptionalConfigurationProperty<List<Integer>> SHARDS_ASSIGNED = ConfigurationProperty.forKey((String)"shards.assigned").asInteger().multivalued().build();
    private static final ConfigurationProperty<Boolean> PROCESSORS_INDEXING_ENABLED = ConfigurationProperty.forKey((String)"processors.indexing.enabled").asBoolean().withDefault((Object)true).build();
    private static final ConfigurationProperty<Integer> PROCESSORS_INDEXING_POLLING_INTERVAL = ConfigurationProperty.forKey((String)"processors.indexing.polling_interval").asInteger().withDefault((Object)100).build();
    private static final ConfigurationProperty<Integer> PROCESSORS_INDEXING_BATCH_SIZE = ConfigurationProperty.forKey((String)"processors.indexing.batch_size").asInteger().withDefault((Object)50).build();
    private static final OptionalConfigurationProperty<BeanReference<? extends OutboxEventFinderProvider>> PROCESSORS_INDEXING_OUTBOX_EVENT_FINDER_PROVIDER = ConfigurationProperty.forKey((String)"processors.indexing.outbox_event_finder.provider").asBeanReference(OutboxEventFinderProvider.class).build();
    public static final String PROCESSOR_NAME_PREFIX = "Outbox event processor";
    private BeanHolder<? extends OutboxEventFinderProvider> finderProviderHolder;
    private ScheduledExecutorService scheduledExecutor;
    private List<Integer> assignedShardIndices;
    private RangeHashTable<OutboxEventBackgroundProcessor> indexingProcessors;

    @Override
    public void configureAutomaticIndexing(AutomaticIndexingConfigurationContext context) {
        context.sendIndexingEventsTo(ctx -> new OutboxEventSendingPlan(ctx.session()), true);
    }

    @Override
    public CompletableFuture<?> start(CoordinationStrategyStartContext context) {
        Optional finderProviderHolderOptional = PROCESSORS_INDEXING_OUTBOX_EVENT_FINDER_PROVIDER.getAndMap(context.configurationPropertySource(), arg_0 -> ((BeanResolver)context.beanResolver()).resolve(arg_0));
        if (finderProviderHolderOptional.isPresent()) {
            this.finderProviderHolder = (BeanHolder)finderProviderHolderOptional.get();
            log.debugf("Outbox processing will use custom outbox event finder provider '%s'.", this.finderProviderHolder.get());
        } else {
            this.finderProviderHolder = BeanHolder.of((Object)new DefaultOutboxEventFinder.Provider());
        }
        if (((Boolean)PROCESSORS_INDEXING_ENABLED.get(context.configurationPropertySource())).booleanValue()) {
            this.initializeProcessors(context);
        } else {
            log.indexingProcessorDisabled();
        }
        return CompletableFuture.completedFuture(null);
    }

    private void initializeProcessors(CoordinationStrategyStartContext context) {
        int totalShardCount;
        ConfigurationPropertySource configurationSource = context.configurationPropertySource();
        boolean shardsStatic = (Boolean)SHARDS_STATIC.get(configurationSource);
        if (shardsStatic) {
            totalShardCount = (Integer)SHARDS_TOTAL_COUNT.getAndMapOrThrow(configurationSource, this::checkTotalShardCount, log::missingPropertyForStaticSharding);
            this.assignedShardIndices = (List)SHARDS_ASSIGNED.getAndMapOrThrow(configurationSource, shardIndices -> this.checkAssignedShardIndices(configurationSource, totalShardCount, (List<Integer>)shardIndices), log::missingPropertyForStaticSharding);
        } else {
            log.warnf("Dynamic sharding is not implemented yet; defaulting to static sharding assuming a single node", new Object[0]);
            totalShardCount = 1;
            this.assignedShardIndices = Collections.singletonList(0);
        }
        int pollingInterval = (Integer)PROCESSORS_INDEXING_POLLING_INTERVAL.get(configurationSource);
        int batchSize = (Integer)PROCESSORS_INDEXING_BATCH_SIZE.get(configurationSource);
        this.scheduledExecutor = context.threadPoolProvider().newScheduledExecutor(this.assignedShardIndices.size(), PROCESSOR_NAME_PREFIX);
        RangeCompatibleHashFunction hashFunction = OutboxEventSendingPlan.HASH_FUNCTION;
        this.indexingProcessors = new RangeHashTable(hashFunction, totalShardCount);
        for (int shardIndex : this.assignedShardIndices) {
            Optional<OutboxEventPredicate> predicate = totalShardCount == 1 ? Optional.empty() : Optional.of(new EntityIdHashRangeOutboxEventPredicate((Range<Integer>)this.indexingProcessors.rangeForBucket(shardIndex)));
            OutboxEventFinder finder = ((OutboxEventFinderProvider)this.finderProviderHolder.get()).create(predicate);
            OutboxEventBackgroundProcessor processor = new OutboxEventBackgroundProcessor("Outbox event processor - " + shardIndex, context.mapping(), this.scheduledExecutor, finder, pollingInterval, batchSize);
            this.indexingProcessors.set(shardIndex, (Object)processor);
        }
        for (int processedShardIndex : this.assignedShardIndices) {
            ((OutboxEventBackgroundProcessor)this.indexingProcessors.get(processedShardIndex)).start();
        }
    }

    private Integer checkTotalShardCount(Integer totalShardCount) {
        if (totalShardCount <= 0) {
            throw log.invalidTotalShardCount();
        }
        return totalShardCount;
    }

    private List<Integer> checkAssignedShardIndices(ConfigurationPropertySource configurationPropertySource, int totalShardCount, List<Integer> shardIndices) {
        for (Integer shardIndex : shardIndices) {
            if (0 <= shardIndex && shardIndex < totalShardCount) continue;
            throw log.invalidShardIndex(totalShardCount, SHARDS_TOTAL_COUNT.resolveOrRaw(configurationPropertySource));
        }
        return new ArrayList<Integer>(new HashSet<Integer>(shardIndices));
    }

    @Override
    public CompletableFuture<?> preStop(CoordinationStrategyPreStopContext context) {
        if (this.indexingProcessors == null) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture[] futures = new CompletableFuture[this.assignedShardIndices.size()];
        int i = 0;
        for (int shardIndex : this.assignedShardIndices) {
            futures[i] = ((OutboxEventBackgroundProcessor)this.indexingProcessors.get(shardIndex)).preStop();
            ++i;
        }
        return CompletableFuture.allOf(futures);
    }

    @Override
    public void stop() {
        try (Closer closer = new Closer();){
            closer.pushAll(OutboxEventBackgroundProcessor::stop, this.indexingProcessors);
            closer.push(ExecutorService::shutdownNow, (Object)this.scheduledExecutor);
            closer.push(BeanHolder::close, this.finderProviderHolder);
        }
    }
}

