package org.apache.pulsar.zookeeper;

import java.io.IOException;
import java.util.AbstractMap;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.AsyncLoadingCache;
import org.apache.pulsar.shade.com.github.benmanes.caffeine.cache.Caffeine;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.SafeRunnable;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.shade.org.apache.zookeeper.WatchedEvent;
import org.apache.pulsar.shade.org.apache.zookeeper.Watcher;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;
import org.apache.pulsar.shade.org.apache.zookeeper.data.Stat;
import org.apache.pulsar.stats.CacheMetricsCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/zookeeper/ZooKeeperCache.class */
public abstract class ZooKeeperCache implements Watcher {
    public static final String ZK_CACHE_INSTANCE = "zk_cache_instance";
    protected final AsyncLoadingCache<String, Pair<Map.Entry<Object, Stat>, Long>> dataCache;
    protected final AsyncLoadingCache<String, Set<String>> childrenCache;
    protected final AsyncLoadingCache<String, Boolean> existsCache;
    private final OrderedExecutor executor;
    private final OrderedExecutor backgroundExecutor;
    private boolean shouldShutdownExecutor;
    private final int zkOperationTimeoutSeconds;
    private static final int DEFAULT_CACHE_EXPIRY_SECONDS = 300;
    private final int cacheExpirySeconds;
    protected AtomicReference<ZooKeeper> zkSession;
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperCache.class);
    private static Logger log = LoggerFactory.getLogger(ZooKeeperCache.class);

    /* loaded from: input_file:org/apache/pulsar/zookeeper/ZooKeeperCache$CacheUpdater.class */
    public interface CacheUpdater<T> {
        void registerListener(ZooKeeperCacheListener<T> zooKeeperCacheListener);

        void unregisterListener(ZooKeeperCacheListener<T> zooKeeperCacheListener);

        void reloadCache(String str);
    }

    /* loaded from: input_file:org/apache/pulsar/zookeeper/ZooKeeperCache$Deserializer.class */
    public interface Deserializer<T> {
        T deserialize(String str, byte[] bArr) throws Exception;
    }

    public ZooKeeperCache(String str, ZooKeeper zooKeeper, int i, OrderedExecutor orderedExecutor) {
        this(str, zooKeeper, i, orderedExecutor, 300);
    }

    public ZooKeeperCache(String str, ZooKeeper zooKeeper, int i, OrderedExecutor orderedExecutor, int i2) {
        this.backgroundExecutor = OrderedExecutor.newBuilder().name("zk-cache-background").numThreads(2).build();
        this.zkSession = new AtomicReference<>(null);
        Preconditions.checkNotNull(orderedExecutor);
        this.zkOperationTimeoutSeconds = i;
        this.executor = orderedExecutor;
        this.zkSession.set(zooKeeper);
        this.shouldShutdownExecutor = false;
        this.cacheExpirySeconds = i2;
        this.dataCache = Caffeine.newBuilder().recordStats().buildAsync((str2, executor) -> {
            return null;
        });
        this.childrenCache = Caffeine.newBuilder().recordStats().expireAfterWrite(i2, TimeUnit.SECONDS).buildAsync((str3, executor2) -> {
            return null;
        });
        this.existsCache = Caffeine.newBuilder().recordStats().expireAfterWrite(i2, TimeUnit.SECONDS).buildAsync((str4, executor3) -> {
            return null;
        });
        CacheMetricsCollector.CAFFEINE.addCache(str + "-data", this.dataCache);
        CacheMetricsCollector.CAFFEINE.addCache(str + "-children", this.childrenCache);
        CacheMetricsCollector.CAFFEINE.addCache(str + "-exists", this.existsCache);
    }

    public ZooKeeperCache(String str, ZooKeeper zooKeeper, int i) {
        this(str, zooKeeper, i, OrderedExecutor.newBuilder().name("zk-cache-callback-executor").build());
        this.shouldShutdownExecutor = true;
    }

    public ZooKeeper getZooKeeper() {
        return this.zkSession.get();
    }

    public <T> void process(WatchedEvent watchedEvent, final CacheUpdater<T> cacheUpdater) {
        final String path = watchedEvent.getPath();
        if (path != null) {
            this.dataCache.synchronous().invalidate(path);
            this.childrenCache.synchronous().invalidate(path);
            if (watchedEvent.getType().equals(Watcher.Event.EventType.NodeCreated) || watchedEvent.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
                this.childrenCache.synchronous().invalidate(ZkUtils.getParentForPath(path));
            }
            this.existsCache.synchronous().invalidate(path);
            if (this.executor == null || cacheUpdater == null) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Cannot reload cache for path: {}, updater: {}", path, cacheUpdater);
                }
            } else {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Submitting reload cache task to the executor for path: {}, updater: {}", path, cacheUpdater);
                }
                try {
                    this.executor.executeOrdered(path, new SafeRunnable() { // from class: org.apache.pulsar.zookeeper.ZooKeeperCache.1
                        @Override // org.apache.pulsar.shade.org.apache.bookkeeper.common.util.SafeRunnable
                        public void safeRun() {
                            cacheUpdater.reloadCache(path);
                        }
                    });
                } catch (RejectedExecutionException e) {
                    LOG.error("Failed to updated zk-cache {} on zk-watch {}", path, e.getMessage());
                }
            }
        }
    }

    public void invalidateAll() {
        invalidateAllData();
        invalidateAllChildren();
        invalidateAllExists();
    }

    private void invalidateAllExists() {
        this.existsCache.synchronous().invalidateAll();
    }

    public void invalidateAllData() {
        this.dataCache.synchronous().invalidateAll();
    }

    public void invalidateAllChildren() {
        this.childrenCache.synchronous().invalidateAll();
    }

    public void invalidateData(String str) {
        this.dataCache.synchronous().invalidate(str);
    }

    public void invalidateChildren(String str) {
        this.childrenCache.synchronous().invalidate(str);
    }

    private void invalidateExists(String str) {
        this.existsCache.synchronous().invalidate(str);
    }

    public void asyncInvalidate(String str) {
        this.backgroundExecutor.execute(() -> {
            invalidate(str);
        });
    }

    public int getZkOperationTimeoutSeconds() {
        return this.zkOperationTimeoutSeconds;
    }

    public void invalidate(String str) {
        invalidateData(str);
        invalidateChildren(str);
        invalidateExists(str);
    }

    public boolean exists(String str) throws KeeperException, InterruptedException {
        return exists(str, this);
    }

    private boolean exists(String str, Watcher watcher) throws KeeperException, InterruptedException {
        return existsAsync(str, watcher).join().booleanValue();
    }

    public CompletableFuture<Boolean> existsAsync(String str, Watcher watcher) {
        return this.existsCache.get((AsyncLoadingCache<String, Boolean>) str, (BiFunction<? super AsyncLoadingCache<String, Boolean>, Executor, CompletableFuture<Boolean>>) (str2, executor) -> {
            ZooKeeper zooKeeper = this.zkSession.get();
            if (zooKeeper == null) {
                return FutureUtil.failedFuture(new IOException("ZK session not ready"));
            }
            CompletableFuture completableFuture = new CompletableFuture();
            zooKeeper.exists(str, watcher, (i, str2, obj, stat) -> {
                if (i == KeeperException.Code.OK.intValue()) {
                    completableFuture.complete(true);
                } else if (i == KeeperException.Code.NONODE.intValue()) {
                    completableFuture.complete(false);
                } else {
                    completableFuture.completeExceptionally(KeeperException.create(i));
                }
            }, (Object) null);
            return completableFuture;
        });
    }

    public <T> Optional<T> getData(String str, Deserializer<T> deserializer) throws Exception {
        return (Optional<T>) getData(str, this, deserializer).map(entry -> {
            return entry.getKey();
        });
    }

    public <T> Optional<Map.Entry<T, Stat>> getEntry(String str, Deserializer<T> deserializer) throws Exception {
        return getData(str, this, deserializer);
    }

    public <T> CompletableFuture<Optional<Map.Entry<T, Stat>>> getEntryAsync(String str, Deserializer<T> deserializer) {
        CompletableFuture<Optional<Map.Entry<T, Stat>>> completableFuture = new CompletableFuture<>();
        CompletableFuture<Optional<Map.Entry<T, Stat>>> dataAsync = getDataAsync(str, this, deserializer);
        completableFuture.getClass();
        dataAsync.thenAccept((v1) -> {
            r1.complete(v1);
        }).exceptionally(th -> {
            asyncInvalidate(str);
            if (th.getCause() instanceof KeeperException.NoNodeException) {
                completableFuture.complete(Optional.empty());
                return null;
            }
            completableFuture.completeExceptionally(th.getCause());
            return null;
        });
        return completableFuture;
    }

    public <T> CompletableFuture<Optional<T>> getDataAsync(String str, Deserializer<T> deserializer) {
        CompletableFuture<Optional<T>> completableFuture = new CompletableFuture<>();
        getDataAsync(str, this, deserializer).thenAccept(optional -> {
            completableFuture.complete(optional.map(entry -> {
                return entry.getKey();
            }));
        }).exceptionally(th -> {
            asyncInvalidate(str);
            if (th.getCause() instanceof KeeperException.NoNodeException) {
                completableFuture.complete(Optional.empty());
                return null;
            }
            completableFuture.completeExceptionally(th.getCause());
            return null;
        });
        return completableFuture;
    }

    public <T> Optional<Map.Entry<T, Stat>> getData(String str, Watcher watcher, Deserializer<T> deserializer) throws Exception {
        try {
            return getDataAsync(str, watcher, deserializer).get(this.zkOperationTimeoutSeconds, TimeUnit.SECONDS);
        } catch (ExecutionException e) {
            asyncInvalidate(str);
            Throwable cause = e.getCause();
            if (cause instanceof KeeperException) {
                throw ((KeeperException) cause);
            }
            if (cause instanceof InterruptedException) {
                LOG.warn("Time-out while fetching {} zk-data in {} sec", str, Integer.valueOf(this.zkOperationTimeoutSeconds));
                throw ((InterruptedException) cause);
            }
            if (cause instanceof RuntimeException) {
                throw ((RuntimeException) cause);
            }
            throw new RuntimeException(cause);
        } catch (TimeoutException e2) {
            LOG.warn("Time-out while fetching {} zk-data in {} sec", str, Integer.valueOf(this.zkOperationTimeoutSeconds));
            asyncInvalidate(str);
            throw e2;
        }
    }

    public <T> CompletableFuture<Optional<Map.Entry<T, Stat>>> getDataAsync(String str, Watcher watcher, Deserializer<T> deserializer) {
        Preconditions.checkNotNull(str);
        Preconditions.checkNotNull(deserializer);
        checkAndRefreshExpiredEntry(str, deserializer);
        CompletableFuture<Optional<Map.Entry<T, Stat>>> completableFuture = new CompletableFuture<>();
        this.dataCache.get((AsyncLoadingCache<String, Pair<Map.Entry<Object, Stat>, Long>>) str, (BiFunction<? super AsyncLoadingCache<String, Pair<Map.Entry<Object, Stat>, Long>>, Executor, CompletableFuture<Pair<Map.Entry<Object, Stat>, Long>>>) (str2, executor) -> {
            CompletableFuture completableFuture2 = new CompletableFuture();
            try {
                this.zkSession.get().getData(str, watcher, (i, str2, obj, bArr, stat) -> {
                    if (i != KeeperException.Code.OK.intValue()) {
                        if (i == KeeperException.Code.NONODE.intValue()) {
                            executor.execute(() -> {
                                completableFuture2.complete(null);
                            });
                            return;
                        } else {
                            executor.execute(() -> {
                                completableFuture2.completeExceptionally(KeeperException.create(i));
                            });
                            return;
                        }
                    }
                    try {
                        Object deserialize = deserializer.deserialize(str, bArr);
                        executor.execute(() -> {
                            completableFuture2.complete(ImmutablePair.of(new AbstractMap.SimpleImmutableEntry(deserialize, stat), Long.valueOf(System.nanoTime())));
                        });
                    } catch (Exception e) {
                        executor.execute(() -> {
                            completableFuture2.completeExceptionally(e);
                        });
                    }
                }, (Object) null);
            } catch (Exception e) {
                LOG.warn("Failed to access zkSession for {} {}", new Object[]{str, e.getMessage(), e});
                completableFuture2.completeExceptionally(e);
            }
            return completableFuture2;
        }).thenAccept(pair -> {
            if (pair != null) {
                completableFuture.complete(Optional.of((Map.Entry) pair.getLeft()));
            } else {
                completableFuture.complete(Optional.empty());
            }
        }).exceptionally(th -> {
            completableFuture.completeExceptionally(th);
            return null;
        });
        return completableFuture;
    }

    private <T> void checkAndRefreshExpiredEntry(String str, Deserializer<T> deserializer) {
        Pair<Map.Entry<Object, Stat>, Long> now;
        CompletableFuture<Pair<Map.Entry<Object, Stat>, Long>> ifPresent = this.dataCache.getIfPresent(str);
        if (ifPresent == null || !ifPresent.isDone() || (now = ifPresent.getNow(null)) == null || now.getRight() == null || System.nanoTime() - now.getRight().longValue() <= TimeUnit.SECONDS.toNanos(this.cacheExpirySeconds)) {
            return;
        }
        this.zkSession.get().getData(str, this, (i, str2, obj, bArr, stat) -> {
            if (i != KeeperException.Code.OK.intValue()) {
                log.warn("Failed to refresh zookeeper-cache for {} due to {}", str, Integer.valueOf(i));
                return;
            }
            try {
                this.dataCache.put(str, CompletableFuture.completedFuture(ImmutablePair.of(new AbstractMap.SimpleImmutableEntry(deserializer.deserialize(str, bArr), stat), Long.valueOf(System.nanoTime()))));
            } catch (Exception e) {
                log.warn("Failed to refresh zookeeper-cache for {}", str, e);
            }
        }, (Object) null);
    }

    public Set<String> getChildren(String str) throws KeeperException, InterruptedException {
        try {
            return getChildrenAsync(str, this).join();
        } catch (CompletionException e) {
            if (e.getCause() instanceof KeeperException) {
                throw ((KeeperException) e.getCause());
            }
            throw e;
        }
    }

    public CompletableFuture<Set<String>> getChildrenAsync(String str, Watcher watcher) {
        return this.childrenCache.get((AsyncLoadingCache<String, Set<String>>) str, (BiFunction<? super AsyncLoadingCache<String, Set<String>>, Executor, CompletableFuture<Set<String>>>) (str2, executor) -> {
            CompletableFuture completableFuture = new CompletableFuture();
            executor.execute(SafeRunnable.safeRun(() -> {
                ZooKeeper zooKeeper = this.zkSession.get();
                if (zooKeeper == null) {
                    completableFuture.completeExceptionally(new IOException("ZK session not ready"));
                } else {
                    zooKeeper.getChildren(str, watcher, (i, str2, obj, list) -> {
                        if (i == KeeperException.Code.OK.intValue()) {
                            completableFuture.complete(Sets.newTreeSet(list));
                        } else if (i == KeeperException.Code.NONODE.intValue()) {
                            existsAsync(str, watcher).thenAccept(bool -> {
                                if (bool.booleanValue()) {
                                    getChildrenAsync(str, watcher).thenAccept(set -> {
                                        completableFuture.complete(set);
                                    }).exceptionally(th -> {
                                        completableFuture.completeExceptionally(th);
                                        return null;
                                    });
                                } else {
                                    completableFuture.complete(Collections.emptySet());
                                }
                            }).exceptionally(th -> {
                                completableFuture.completeExceptionally(th);
                                return null;
                            });
                        } else {
                            completableFuture.completeExceptionally(KeeperException.create(i));
                        }
                    }, (Object) null);
                }
            }));
            return completableFuture;
        });
    }

    public <T> T getDataIfPresent(String str) {
        CompletableFuture<Pair<Map.Entry<Object, Stat>, Long>> ifPresent = this.dataCache.getIfPresent(str);
        if (ifPresent == null || !ifPresent.isDone() || ifPresent.isCompletedExceptionally()) {
            return null;
        }
        return (T) ifPresent.join().getLeft().getKey();
    }

    public Set<String> getChildrenIfPresent(String str) {
        CompletableFuture<Set<String>> ifPresent = this.childrenCache.getIfPresent(str);
        if (ifPresent == null || !ifPresent.isDone() || ifPresent.isCompletedExceptionally()) {
            return null;
        }
        return ifPresent.getNow(null);
    }

    @Override // org.apache.pulsar.shade.org.apache.zookeeper.Watcher
    public void process(WatchedEvent watchedEvent) {
        LOG.info("[{}] Received ZooKeeper watch event: {}", this.zkSession.get(), watchedEvent);
        process(watchedEvent, null);
    }

    public void invalidateRoot(String str) {
        for (String str2 : this.childrenCache.synchronous().asMap().keySet()) {
            if (str2.startsWith(str)) {
                this.childrenCache.synchronous().invalidate(str2);
            }
        }
    }

    public void stop() {
        if (this.shouldShutdownExecutor) {
            this.executor.shutdown();
        }
        this.backgroundExecutor.shutdown();
    }

    public boolean checkRegNodeAndWaitExpired(String str) throws IOException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        try {
            Stat exists = getZooKeeper().exists(str, new Watcher() { // from class: org.apache.pulsar.zookeeper.ZooKeeperCache.2
                @Override // org.apache.pulsar.shade.org.apache.zookeeper.Watcher
                public void process(WatchedEvent watchedEvent) {
                    if (Watcher.Event.EventType.NodeDeleted == watchedEvent.getType()) {
                        countDownLatch.countDown();
                    }
                }
            });
            if (null == exists) {
                return false;
            }
            if (exists.getEphemeralOwner() == getZooKeeper().getSessionId()) {
                return true;
            }
            log.info("Previous bookie registration znode: {} exists, so waiting zk sessiontimeout: {} ms for znode deletion", str, Integer.valueOf(getZooKeeper().getSessionTimeout()));
            if (countDownLatch.await(getZooKeeper().getSessionTimeout(), TimeUnit.MILLISECONDS)) {
                return false;
            }
            throw new KeeperException.NodeExistsException(str);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            log.error("Interrupted checking and wait ephemeral znode {} expired : ", str, e);
            throw new IOException("Interrupted checking and wait ephemeral znode " + str + " expired", e);
        } catch (KeeperException e2) {
            log.error("ZK exception checking and wait ephemeral znode {} expired : ", str, e2);
            throw new IOException("ZK exception checking and wait ephemeral znode " + str + " expired", e2);
        }
    }
}
