/*
 * Decompiled with CFR 0.152.
 */
package io.quarkus.redis.runtime.datasource;

import io.quarkus.redis.datasource.ReactiveRedisCommands;
import io.quarkus.redis.datasource.ReactiveRedisDataSource;
import io.quarkus.redis.datasource.stream.ClaimedMessages;
import io.quarkus.redis.datasource.stream.ReactiveStreamCommands;
import io.quarkus.redis.datasource.stream.StreamMessage;
import io.quarkus.redis.datasource.stream.StreamRange;
import io.quarkus.redis.datasource.stream.XAddArgs;
import io.quarkus.redis.datasource.stream.XClaimArgs;
import io.quarkus.redis.datasource.stream.XGroupCreateArgs;
import io.quarkus.redis.datasource.stream.XGroupSetIdArgs;
import io.quarkus.redis.datasource.stream.XReadArgs;
import io.quarkus.redis.datasource.stream.XReadGroupArgs;
import io.quarkus.redis.datasource.stream.XTrimArgs;
import io.quarkus.redis.runtime.datasource.AbstractStreamCommands;
import io.quarkus.redis.runtime.datasource.ReactiveRedisDataSourceImpl;
import io.smallrye.mutiny.Uni;
import io.vertx.mutiny.redis.client.Response;
import io.vertx.redis.client.ResponseType;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ReactiveStreamCommandsImpl<K, F, V>
extends AbstractStreamCommands<K, F, V>
implements ReactiveStreamCommands<K, F, V>,
ReactiveRedisCommands {
    private final ReactiveRedisDataSource reactive;
    private final Class<V> typeOfValue;
    private final Class<F> typeOfField;
    private final Class<K> typeOfKey;

    public ReactiveStreamCommandsImpl(ReactiveRedisDataSourceImpl redis, Class<K> k, Class<F> f, Class<V> v) {
        super(redis, k, f, v);
        this.typeOfKey = k;
        this.typeOfField = f;
        this.typeOfValue = v;
        this.reactive = redis;
    }

    @Override
    public ReactiveRedisDataSource getDataSource() {
        return this.reactive;
    }

    @Override
    public Uni<Integer> xack(K key, String group, String ... ids) {
        return super._xack(key, group, ids).map(Response::toInteger);
    }

    @Override
    public Uni<String> xadd(K key, Map<F, V> payload) {
        return super._xadd(key, payload).map(ReactiveStreamCommandsImpl::getIdOrNull);
    }

    protected static String getIdOrNull(Response r) {
        if (r == null) {
            return null;
        }
        return r.toString();
    }

    @Override
    public Uni<String> xadd(K key, XAddArgs args, Map<F, V> payload) {
        return super._xadd(key, args, payload).map(ReactiveStreamCommandsImpl::getIdOrNull);
    }

    @Override
    public Uni<ClaimedMessages<K, F, V>> xautoclaim(K key, String group, String consumer, Duration minIdleTime, String start, int count) {
        return super._xautoclaim(key, group, consumer, minIdleTime, start, count).map(r -> this.decodeAsClaimedMessages(key, (Response)r));
    }

    protected ClaimedMessages<K, F, V> decodeAsClaimedMessages(K key, Response r) {
        if (r == null) {
            return new ClaimedMessages(null, List.of());
        }
        String id = r.get(0).toString();
        Response l = r.get(1);
        List<StreamMessage<K, F, V>> list = this.decodeListOfMessages(key, l);
        return new ClaimedMessages<K, F, V>(id, list);
    }

    protected List<StreamMessage<K, F, V>> decodeMessageListPrefixedByKey(Response r) {
        if (r == null) {
            return List.of();
        }
        K actualKey = this.marshaller.decode(this.typeOfKey, r.get(0));
        Response listOfMessages = r.get(1);
        ArrayList<StreamMessage<K, F, V>> list = new ArrayList<StreamMessage<K, F, V>>();
        for (int i = 0; i < listOfMessages.size(); ++i) {
            list.add(this.decodeMessageWithStreamId(actualKey, listOfMessages.get(i)));
        }
        return list;
    }

    private StreamMessage<K, F, V> decodeMessageWithStreamId(K key, Response response) {
        if (response == null) {
            return null;
        }
        if (response.type() == ResponseType.BULK) {
            return new StreamMessage(key, response.toString(), Map.of());
        }
        String streamId = response.get(0).toString();
        Response payload = response.get(1);
        Map<F, V> content = this.decodeMessagePayload(payload);
        return new StreamMessage<K, F, V>(key, streamId, content);
    }

    Map<F, V> decodeMessagePayload(Response response) {
        HashMap<Object, V> map = new HashMap<Object, V>();
        Object current = null;
        for (Response nested : response) {
            if (current == null) {
                current = this.marshaller.decode(this.typeOfField, nested);
                continue;
            }
            map.put(current, this.marshaller.decode(this.typeOfValue, nested));
            current = null;
        }
        return map;
    }

    @Override
    public Uni<ClaimedMessages<K, F, V>> xautoclaim(K key, String group, String consumer, Duration minIdleTime, String start) {
        return super._xautoclaim(key, group, consumer, minIdleTime, start).map(r -> this.decodeAsClaimedMessages(key, (Response)r));
    }

    @Override
    public Uni<ClaimedMessages<K, F, V>> xautoclaim(K key, String group, String consumer, Duration minIdleTime, String start, int count, boolean justId) {
        return super._xautoclaim(key, group, consumer, minIdleTime, start, count, justId).map(r -> this.decodeAsClaimedMessages(key, (Response)r));
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xclaim(K key, String group, String consumer, Duration minIdleTime, String ... id) {
        return super._xclaim(key, group, consumer, minIdleTime, id).map(r -> this.decodeListOfMessages(key, (Response)r));
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xclaim(K key, String group, String consumer, Duration minIdleTime, XClaimArgs args, String ... id) {
        return super._xclaim(key, group, consumer, minIdleTime, args, id).map(r -> this.decodeListOfMessages(key, (Response)r));
    }

    @Override
    public Uni<Integer> xdel(K key, String ... id) {
        return super._xdel(key, id).map(Response::toInteger);
    }

    @Override
    public Uni<Void> xgroupCreate(K key, String groupname, String from) {
        return super._xgroupCreate(key, groupname, from).replaceWithVoid();
    }

    @Override
    public Uni<Void> xgroupCreate(K key, String groupname, String from, XGroupCreateArgs args) {
        return super._xgroupCreate(key, groupname, from, args).replaceWithVoid();
    }

    @Override
    public Uni<Boolean> xgroupCreateConsumer(K key, String groupname, String consumername) {
        return super._xgroupCreateConsumer(key, groupname, consumername).map(Response::toBoolean);
    }

    @Override
    public Uni<Long> xgroupDelConsumer(K key, String groupname, String consumername) {
        return super._xgroupDelConsumer(key, groupname, consumername).map(Response::toLong);
    }

    @Override
    public Uni<Boolean> xgroupDestroy(K key, String groupname) {
        return super._xgroupDestroy(key, groupname).map(Response::toBoolean);
    }

    @Override
    public Uni<Void> xgroupSetId(K key, String groupname, String from) {
        return super._xgroupSetId(key, groupname, from).replaceWithVoid();
    }

    @Override
    public Uni<Void> xgroupSetId(K key, String groupname, String from, XGroupSetIdArgs args) {
        return super._xgroupSetId(key, groupname, from, args).replaceWithVoid();
    }

    @Override
    public Uni<Long> xlen(K key) {
        return super._xlen(key).map(Response::toLong);
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xrange(K key, StreamRange range, int count) {
        return super._xrange(key, range, count).map(r -> this.decodeListOfMessages(key, (Response)r));
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xrange(K key, StreamRange range) {
        return super._xrange(key, range).map(r -> this.decodeListOfMessages(key, (Response)r));
    }

    protected List<StreamMessage<K, F, V>> decodeListOfMessages(K key, Response r) {
        if (r == null) {
            return List.of();
        }
        ArrayList<StreamMessage<K, F, V>> list = new ArrayList<StreamMessage<K, F, V>>();
        for (Response response : r) {
            list.add(this.decodeMessageWithStreamId(key, response));
        }
        return list;
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xread(K key, String id) {
        return this.xread(Map.of(key, id));
    }

    protected List<StreamMessage<K, F, V>> decodeAsListOfMessagesFromXRead(Response r) {
        if (r == null) {
            return List.of();
        }
        ArrayList<StreamMessage<K, F, V>> list = new ArrayList<StreamMessage<K, F, V>>();
        for (Response response : r) {
            list.addAll(this.decodeMessageListPrefixedByKey(response));
        }
        return list;
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xread(Map<K, String> lastIdsPerStream) {
        return super._xread(lastIdsPerStream).map(this::decodeAsListOfMessagesFromXRead);
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xread(K key, String id, XReadArgs args) {
        return this.xread(Map.of(key, id), args);
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xread(Map<K, String> lastIdsPerStream, XReadArgs args) {
        return super._xread(lastIdsPerStream, args).map(this::decodeAsListOfMessagesFromXRead);
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xreadgroup(String group, String consumer, K key, String id) {
        return this.xreadgroup(group, consumer, Map.of(key, id));
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xreadgroup(String group, String consumer, Map<K, String> lastIdsPerStream) {
        return super._xreadgroup(group, consumer, lastIdsPerStream).map(this::decodeAsListOfMessagesFromXRead);
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xreadgroup(String group, String consumer, K key, String id, XReadGroupArgs args) {
        return this.xreadgroup(group, consumer, Map.of(key, id), args);
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xreadgroup(String group, String consumer, Map<K, String> lastIdsPerStream, XReadGroupArgs args) {
        return super._xreadgroup(group, consumer, lastIdsPerStream, args).map(this::decodeAsListOfMessagesFromXRead);
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xrevrange(K key, StreamRange range, int count) {
        return super._xrevrange(key, range, count).map(r -> this.decodeListOfMessages(key, (Response)r));
    }

    @Override
    public Uni<List<StreamMessage<K, F, V>>> xrevrange(K key, StreamRange range) {
        return super._xrevrange(key, range).map(r -> this.decodeListOfMessages(key, (Response)r));
    }

    @Override
    public Uni<Long> xtrim(K key, String threshold) {
        return super._xtrim(key, new XTrimArgs().minid(threshold)).map(Response::toLong);
    }

    @Override
    public Uni<Long> xtrim(K key, XTrimArgs args) {
        return super._xtrim(key, args).map(Response::toLong);
    }
}

