package io.vertx.redis.client.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.tracing.TracingPolicy;
import io.vertx.redis.client.Command;
import io.vertx.redis.client.PoolOptions;
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisClusterConnectOptions;
import io.vertx.redis.client.RedisConnection;
import io.vertx.redis.client.RedisReplicas;
import io.vertx.redis.client.Request;
import io.vertx.redis.client.Response;
import io.vertx.redis.client.impl.types.MultiType;
import io.vertx.redis.client.impl.types.NumberType;
import io.vertx.redis.client.impl.types.SimpleStringType;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

/* loaded from: input_file:io/vertx/redis/client/impl/RedisClusterClient.class */
public class RedisClusterClient extends BaseRedisClient implements Redis {
    private static final Logger LOG = LoggerFactory.getLogger(RedisClusterClient.class);
    private final RedisClusterConnectOptions connectOptions;
    private final PoolOptions poolOptions;
    private final AtomicReference<Future<Slots>> slots;

    public static void addReducer(Command command, Function<List<Response>, Response> function) {
        RedisClusterConnection.addReducer(command, function);
    }

    public static void addMasterOnlyCommand(Command command) {
        RedisClusterConnection.addMasterOnlyCommand(command);
    }

    public RedisClusterClient(Vertx vertx, NetClientOptions netClientOptions, PoolOptions poolOptions, RedisClusterConnectOptions redisClusterConnectOptions, TracingPolicy tracingPolicy) {
        super(vertx, netClientOptions, poolOptions, redisClusterConnectOptions, tracingPolicy);
        this.slots = new AtomicReference<>();
        this.connectOptions = redisClusterConnectOptions;
        this.poolOptions = poolOptions;
        if (poolOptions.getMaxWaiting() < poolOptions.getMaxSize()) {
            throw new IllegalStateException("Invalid options: maxWaiting < maxSize");
        }
    }

    @Override // io.vertx.redis.client.Redis
    public Future<RedisConnection> connect() {
        PromiseInternal promise = this.vertx.promise();
        Future onSuccess = getSlots(this.vertx.getOrCreateContext()).onSuccess(slots -> {
            connect(slots, promise);
        });
        promise.getClass();
        onSuccess.onFailure(promise::fail);
        return promise.future();
    }

    private void connect(Slots slots, Handler<AsyncResult<RedisConnection>> handler) {
        int length = slots.endpoints().length;
        if (this.poolOptions.getMaxSize() < length) {
            handler.handle(Future.failedFuture(new RedisConnectException("RedisOptions maxPoolSize < Cluster size(" + length + "): The pool is not able to hold all required connections!")));
            return;
        }
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        AtomicInteger atomicInteger = new AtomicInteger();
        HashMap hashMap = new HashMap();
        for (String str : slots.endpoints()) {
            this.connectionManager.getConnection(str, RedisReplicas.NEVER != this.connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null).onFailure(th -> {
                newKeySet.add(th);
                connectionComplete(atomicInteger, slots, hashMap, newKeySet, handler);
            }).onSuccess(pooledRedisConnection -> {
                synchronized (hashMap) {
                    hashMap.put(str, pooledRedisConnection);
                }
                connectionComplete(atomicInteger, slots, hashMap, newKeySet, handler);
            });
        }
    }

    private void connectionComplete(AtomicInteger atomicInteger, Slots slots, Map<String, PooledRedisConnection> map, Set<Throwable> set, Handler<AsyncResult<RedisConnection>> handler) {
        if (atomicInteger.incrementAndGet() == slots.endpoints().length) {
            if (set.isEmpty()) {
                handler.handle(Future.succeededFuture(new RedisClusterConnection(this.vertx, this.connectOptions, slots, () -> {
                    this.slots.set(null);
                }, map)));
                return;
            }
            synchronized (map) {
                for (PooledRedisConnection pooledRedisConnection : map.values()) {
                    if (pooledRedisConnection != null) {
                        Future<Void> close = pooledRedisConnection.close();
                        Logger logger = LOG;
                        logger.getClass();
                        close.onFailure((v1) -> {
                            r1.warn(v1);
                        });
                    }
                }
            }
            StringBuilder sb = new StringBuilder("Failed to connect to all nodes of the cluster");
            Iterator<Throwable> it = set.iterator();
            while (it.hasNext()) {
                sb.append("\n- ").append(it.next());
            }
            handler.handle(Future.failedFuture(new RedisConnectException(sb.toString())));
        }
    }

    private Future<Slots> getSlots(ContextInternal contextInternal) {
        PromiseInternal promise;
        Future<Slots> future;
        do {
            Future<Slots> future2 = this.slots.get();
            if (future2 != null) {
                return future2;
            }
            promise = contextInternal.promise();
            future = promise.future();
        } while (!this.slots.compareAndSet(null, future));
        LOG.debug("Obtaining hash slot assignment");
        getSlots(this.connectOptions.getEndpoints(), 0, ConcurrentHashMap.newKeySet(), promise);
        return future;
    }

    private void getSlots(List<String> list, int i, Set<Throwable> set, Handler<AsyncResult<Slots>> handler) {
        if (i < list.size()) {
            this.connectionManager.getConnection(list.get(i), RedisReplicas.NEVER != this.connectOptions.getUseReplicas() ? Request.cmd(Command.READONLY) : null).onFailure(th -> {
                set.add(th);
                getSlots(list, i + 1, set, handler);
            }).onSuccess(pooledRedisConnection -> {
                getSlots((String) list.get(i), pooledRedisConnection, asyncResult -> {
                    Future<Void> close = pooledRedisConnection.close();
                    Logger logger = LOG;
                    logger.getClass();
                    close.onFailure((v1) -> {
                        r1.warn(v1);
                    });
                    if (asyncResult.failed()) {
                        set.add(asyncResult.cause());
                        getSlots(list, i + 1, set, handler);
                    } else {
                        handler.handle(Future.succeededFuture((Slots) asyncResult.result()));
                        scheduleCachedSlotsExpiration();
                    }
                });
            });
            return;
        }
        StringBuilder sb = new StringBuilder("Cannot connect to any of the provided endpoints");
        Iterator<Throwable> it = set.iterator();
        while (it.hasNext()) {
            sb.append("\n- ").append(it.next());
        }
        handler.handle(Future.failedFuture(new RedisConnectException(sb.toString())));
        scheduleCachedSlotsExpiration();
    }

    private void getSlots(String str, RedisConnection redisConnection, Handler<AsyncResult<Slots>> handler) {
        redisConnection.send(Request.cmd(Command.CLUSTER).arg("SLOTS"), asyncResult -> {
            if (asyncResult.failed()) {
                handler.handle(Future.failedFuture(asyncResult.cause()));
                return;
            }
            Response response = (Response) asyncResult.result();
            if (response == null || response.size() == 0) {
                handler.handle(Future.failedFuture("CLUSTER SLOTS No slots available in the cluster."));
                return;
            }
            try {
                handler.handle(Future.succeededFuture(new Slots(str, response)));
            } catch (Exception e) {
                handler.handle(Future.failedFuture("CLUSTER SLOTS response invalid: " + e));
            }
        });
    }

    private void scheduleCachedSlotsExpiration() {
        this.vertx.setTimer(this.connectOptions.getHashSlotCacheTTL(), l -> {
            this.slots.set(null);
        });
    }

    static {
        addReducer(Command.MSET, list -> {
            return SimpleStringType.OK;
        });
        addReducer(Command.DEL, list2 -> {
            return NumberType.create(Long.valueOf(list2.stream().mapToLong(response -> {
                Long l = response.toLong();
                if (l == null) {
                    return 0L;
                }
                return l.longValue();
            }).sum()));
        });
        addReducer(Command.MGET, list3 -> {
            int i = 0;
            Iterator it = list3.iterator();
            while (it.hasNext()) {
                i += ((Response) it.next()).size();
            }
            MultiType create = MultiType.create(i, false);
            Iterator it2 = list3.iterator();
            while (it2.hasNext()) {
                Iterator<Response> it3 = ((Response) it2.next()).iterator();
                while (it3.hasNext()) {
                    create.add(it3.next());
                }
            }
            return create;
        });
        addReducer(Command.KEYS, list4 -> {
            int i = 0;
            Iterator it = list4.iterator();
            while (it.hasNext()) {
                i += ((Response) it.next()).size();
            }
            MultiType create = MultiType.create(i, false);
            Iterator it2 = list4.iterator();
            while (it2.hasNext()) {
                Iterator<Response> it3 = ((Response) it2.next()).iterator();
                while (it3.hasNext()) {
                    create.add(it3.next());
                }
            }
            return create;
        });
        addReducer(Command.FLUSHDB, list5 -> {
            return SimpleStringType.OK;
        });
        addReducer(Command.DBSIZE, list6 -> {
            return NumberType.create(Long.valueOf(list6.stream().mapToLong(response -> {
                Long l = response.toLong();
                if (l == null) {
                    return 0L;
                }
                return l.longValue();
            }).sum()));
        });
        addMasterOnlyCommand(Command.WAIT);
        addMasterOnlyCommand(Command.SUBSCRIBE);
        addMasterOnlyCommand(Command.PSUBSCRIBE);
        addMasterOnlyCommand(Command.SSUBSCRIBE);
        addReducer(Command.UNSUBSCRIBE, list7 -> {
            return SimpleStringType.OK;
        });
        addReducer(Command.PUNSUBSCRIBE, list8 -> {
            return SimpleStringType.OK;
        });
        addReducer(Command.SUNSUBSCRIBE, list9 -> {
            return SimpleStringType.OK;
        });
    }
}
