package org.redisson.rx;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.reactivex.Flowable;
import io.reactivex.functions.Action;
import io.reactivex.functions.LongConsumer;
import io.reactivex.processors.ReplayProcessor;
import java.util.concurrent.Callable;
import org.redisson.api.RFuture;
import org.redisson.command.CommandAsyncService;
import org.redisson.connection.ConnectionManager;

/* loaded from: input_file:org/redisson/rx/CommandRxService.class */
public class CommandRxService extends CommandAsyncService implements CommandRxExecutor {
    public CommandRxService(ConnectionManager connectionManager) {
        super(connectionManager);
    }

    public <R> Flowable<R> flowable(final Callable<RFuture<R>> callable) {
        final ReplayProcessor create = ReplayProcessor.create();
        return (Flowable<R>) create.doOnRequest(new LongConsumer() { // from class: org.redisson.rx.CommandRxService.1
            @Override // io.reactivex.functions.LongConsumer
            public void accept(long j) throws Exception {
                ((RFuture) callable.call()).addListener(new FutureListener<R>() { // from class: org.redisson.rx.CommandRxService.1.1
                    @Override // io.netty.util.concurrent.GenericFutureListener
                    public void operationComplete(final Future<R> future) throws Exception {
                        if (!future.isSuccess()) {
                            create.onError(future.cause());
                            return;
                        }
                        create.doOnCancel(new Action() { // from class: org.redisson.rx.CommandRxService.1.1.1
                            @Override // io.reactivex.functions.Action
                            public void run() throws Exception {
                                future.cancel(true);
                            }
                        });
                        if (future.getNow() != null) {
                            create.onNext(future.getNow());
                        }
                        create.onComplete();
                    }
                });
            }
        });
    }
}
