package org.jetlinks.core.cache;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.util.context.ContextView;

/* loaded from: input_file:org/jetlinks/core/cache/DefaultReactiveCacheContainer.class */
class DefaultReactiveCacheContainer<K, V> implements ReactiveCacheContainer<K, V> {
    private final Map<K, Container<K, V>> cache = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/jetlinks/core/cache/DefaultReactiveCacheContainer$Container.class */
    public static class Container<K, T> implements Disposable {
        private static final AtomicReferenceFieldUpdater<Container, Mono> LOADER = AtomicReferenceFieldUpdater.newUpdater(Container.class, Mono.class, "loader");
        private final DefaultReactiveCacheContainer<K, T> main;
        private final K key;
        private Sinks.One<T> await;
        public volatile T loaded;
        protected volatile Mono<T> loader;
        private volatile Disposable disposable;

        public Container(K k, DefaultReactiveCacheContainer<K, T> defaultReactiveCacheContainer, Mono<T> mono) {
            this.key = k;
            this.main = defaultReactiveCacheContainer;
            this.loader = mono;
            update(mono);
        }

        public Container(K k, DefaultReactiveCacheContainer<K, T> defaultReactiveCacheContainer, T t) {
            this.key = k;
            this.main = defaultReactiveCacheContainer;
            this.loaded = t;
            this.loader = Mono.just(t);
            update(this.loader);
        }

        public void update(Mono<T> mono) {
            if (this.disposable != null && !this.disposable.isDisposed()) {
                this.disposable.dispose();
            }
            Sinks.One<T> one = this.await;
            this.await = Sinks.one();
            if (one != null && one.currentSubscriberCount() > 0) {
                one.tryEmitEmpty();
            }
            this.loader = mono.switchIfEmpty(Mono.fromRunnable(this::loadEmpty)).doOnError(this::loadError).doOnNext(this::afterLoaded);
        }

        private void afterLoaded(T t) {
            if (t != this.loaded && (this.loaded instanceof Disposable)) {
                ((Disposable) this.loaded).dispose();
            }
            this.loaded = t;
            this.await.tryEmitValue(t);
        }

        public void dispose() {
            if (this.disposable != null && !this.disposable.isDisposed()) {
                this.disposable.dispose();
            }
            T t = this.loaded;
            if (t instanceof Disposable) {
                ((Disposable) t).dispose();
            }
        }

        private void loadError(Throwable th) {
            this.await.tryEmitError(th);
            this.main.remove(this.key);
        }

        private void loadEmpty() {
            this.await.tryEmitEmpty();
        }

        private void tryLoad(ContextView contextView) {
            Mono andSet = LOADER.getAndSet(this, null);
            if (andSet != null) {
                this.disposable = andSet.contextWrite(contextView).subscribe();
            }
        }

        public Mono<T> ref() {
            return Mono.deferContextual(contextView -> {
                tryLoad(contextView);
                return this.await.asMono();
            });
        }
    }

    @Override // org.jetlinks.core.cache.ReactiveCacheContainer
    public Mono<V> compute(K k, BiFunction<K, V, Mono<V>> biFunction) {
        return this.cache.compute(k, (obj, container) -> {
            if (container == null) {
                return new Container(obj, (DefaultReactiveCacheContainer<Object, T>) this, (Mono) biFunction.apply(obj, null));
            }
            container.update((Mono) biFunction.apply(obj, container.loaded));
            return container;
        }).ref();
    }

    @Override // org.jetlinks.core.cache.ReactiveCacheContainer
    public Mono<V> computeIfAbsent(K k, Function<K, Mono<V>> function) {
        return this.cache.computeIfAbsent(k, obj -> {
            return new Container(obj, (DefaultReactiveCacheContainer<Object, T>) this, (Mono) function.apply(obj));
        }).ref();
    }

    @Override // org.jetlinks.core.cache.ReactiveCacheContainer
    public Mono<V> get(K k, Mono<V> mono) {
        Container<K, V> container = this.cache.get(k);
        return container != null ? container.ref() : mono;
    }

    @Override // org.jetlinks.core.cache.ReactiveCacheContainer
    public V put(K k, V v) {
        Container<K, V> put = this.cache.put(k, new Container<>(k, this, v));
        if (put == null) {
            return null;
        }
        put.dispose();
        return put.loaded;
    }

    @Override // org.jetlinks.core.cache.ReactiveCacheContainer
    public boolean containsKey(K k) {
        return this.cache.containsKey(k);
    }

    @Override // org.jetlinks.core.cache.ReactiveCacheContainer
    public V getNow(K k) {
        Container<K, V> container = this.cache.get(k);
        if (container != null) {
            return container.loaded;
        }
        return null;
    }

    @Override // org.jetlinks.core.cache.ReactiveCacheContainer
    public V remove(K k) {
        Container<K, V> remove = this.cache.remove(k);
        if (null != remove) {
            remove.dispose();
        }
        if (remove == null) {
            return null;
        }
        return remove.loaded;
    }

    @Override // org.jetlinks.core.cache.ReactiveCacheContainer
    public Flux<V> values() {
        return Flux.fromIterable(this.cache.values()).flatMap((v0) -> {
            return v0.ref();
        });
    }

    @Override // org.jetlinks.core.cache.ReactiveCacheContainer
    public List<V> valuesNow() {
        return (List) this.cache.values().stream().filter(container -> {
            return container.loaded != 0;
        }).map(container2 -> {
            return container2.loaded;
        }).collect(Collectors.toList());
    }

    @Override // org.jetlinks.core.cache.ReactiveCacheContainer
    public void clear() {
        HashMap hashMap = new HashMap(this.cache);
        this.cache.clear();
        Iterator<V> it = hashMap.values().iterator();
        while (it.hasNext()) {
            ((Container) it.next()).dispose();
        }
    }

    public void dispose() {
        this.cache.values().forEach((v0) -> {
            v0.dispose();
        });
        this.cache.clear();
    }
}
