package org.jetlinks.supports.config;

import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import java.util.function.Supplier;
import org.jctools.maps.NonBlockingHashMap;
import org.jetlinks.core.cluster.ClusterManager;
import org.jetlinks.core.config.ConfigStorage;
import org.jetlinks.core.config.ConfigStorageManager;
import org.jetlinks.core.event.EventBus;
import org.jetlinks.core.event.Subscription;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/jetlinks/supports/config/EventBusStorageManager.class */
public class EventBusStorageManager implements ConfigStorageManager, Disposable {
    static final String NOTIFY_TOPIC = "/_sys/cluster_cache";
    final ConcurrentMap<String, LocalCacheClusterConfigStorage> cache;
    private volatile Disposable disposable;
    private final EventBus eventBus;
    private final Function<String, LocalCacheClusterConfigStorage> storageBuilder;
    private static final Logger log = LoggerFactory.getLogger(EventBusStorageManager.class);
    private static final AtomicReferenceFieldUpdater<EventBusStorageManager, Disposable> CLUSTER_SUBSCRIBER = AtomicReferenceFieldUpdater.newUpdater(EventBusStorageManager.class, Disposable.class, "disposable");

    public EventBusStorageManager(ClusterManager clusterManager, EventBus eventBus) {
        this(clusterManager, eventBus, -1L);
    }

    public EventBusStorageManager(ClusterManager clusterManager, EventBus eventBus, long j) {
        this.cache = new NonBlockingHashMap();
        this.eventBus = eventBus;
        this.storageBuilder = str -> {
            return new LocalCacheClusterConfigStorage(str, eventBus, clusterManager.createCache(str), j, () -> {
                this.cache.remove(str);
            });
        };
    }

    public EventBusStorageManager(ClusterManager clusterManager, EventBus eventBus, Supplier<ConcurrentMap<String, Object>> supplier) {
        this.eventBus = eventBus;
        this.cache = (ConcurrentMap) supplier.get();
        this.storageBuilder = str -> {
            return new LocalCacheClusterConfigStorage(str, eventBus, clusterManager.createCache(str), -1L, () -> {
                this.cache.remove(str);
            }, (Map) supplier.get());
        };
    }

    private Disposable subscribeCluster() {
        return this.eventBus.subscribe(Subscription.builder().subscriberId("event-bus-storage-listener").topics(new String[]{NOTIFY_TOPIC}).justBroker().build(), topicPayload -> {
            try {
                CacheNotify cacheNotify = (CacheNotify) topicPayload.decode();
                LocalCacheClusterConfigStorage localCacheClusterConfigStorage = this.cache.get(cacheNotify.getName());
                if (localCacheClusterConfigStorage != null) {
                    log.trace("clear local cache :{}", cacheNotify);
                    localCacheClusterConfigStorage.clearLocalCache(cacheNotify);
                } else {
                    log.trace("ignore clear local cache :{}", cacheNotify);
                }
            } catch (Throwable th) {
                log.warn("clear local cache error", th);
            }
            return Mono.empty();
        });
    }

    public void dispose() {
        if (this.disposable != null) {
            this.disposable.dispose();
        }
    }

    public Mono<ConfigStorage> getStorage(String str) {
        if (this.disposable == null) {
            synchronized (this) {
                Disposable subscribeCluster = subscribeCluster();
                if (!CLUSTER_SUBSCRIBER.compareAndSet(this, null, subscribeCluster)) {
                    subscribeCluster.dispose();
                }
            }
        }
        return Mono.fromSupplier(() -> {
            return this.cache.computeIfAbsent(str, this.storageBuilder);
        });
    }
}
