/*
 * Decompiled with CFR 0.152.
 */
package org.hswebframework.ezorm.rdb.executor.jdbc;

import java.sql.Connection;
import java.util.stream.Collectors;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.executor.jdbc.JdbcSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrappers;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

public abstract class JdbcReactiveSqlExecutor
extends JdbcSqlExecutor
implements ReactiveSqlExecutor {
    private static final Logger log = LoggerFactory.getLogger(JdbcReactiveSqlExecutor.class);

    public JdbcReactiveSqlExecutor() {
        super(log);
    }

    public abstract Mono<Connection> getConnection(SqlRequest var1);

    public abstract void releaseConnection(Connection var1, SqlRequest var2);

    @Override
    public Mono<Integer> update(Publisher<SqlRequest> request) {
        return Mono.defer(() -> this.toFlux(request).flatMap(sqlRequest -> this.getConnection((SqlRequest)sqlRequest).flatMap(connection -> Mono.fromSupplier(() -> this.doUpdate((Connection)connection, (SqlRequest)sqlRequest)).doFinally(type -> this.releaseConnection((Connection)connection, (SqlRequest)sqlRequest)))).collect(Collectors.summingInt(Integer::intValue)));
    }

    @Override
    public Mono<Void> execute(Publisher<SqlRequest> request) {
        return Mono.defer(() -> this.toFlux(request).flatMap(sqlRequest -> this.getConnection((SqlRequest)sqlRequest).flatMap(connection -> Mono.fromSupplier(() -> {
            this.doExecute((Connection)connection, (SqlRequest)sqlRequest);
            return null;
        }).doFinally(type -> this.releaseConnection((Connection)connection, (SqlRequest)sqlRequest)))).then());
    }

    @Override
    public <E> Flux<E> select(Publisher<SqlRequest> request, ResultWrapper<E, ?> wrapper) {
        return Flux.create(sink -> {
            Disposable disposable = this.toFlux(request).doFinally(type -> sink.complete()).subscribe(sqlRequest -> this.getConnection((SqlRequest)sqlRequest).subscribe(connection -> {
                this.doSelect((Connection)connection, (SqlRequest)sqlRequest, ResultWrappers.consumer(wrapper, arg_0 -> ((FluxSink)sink).next(arg_0)));
                this.releaseConnection((Connection)connection, (SqlRequest)sqlRequest);
            }));
            sink.onCancel(disposable).onDispose(disposable);
        });
    }

    protected Flux<SqlRequest> toFlux(Publisher<SqlRequest> request) {
        return Flux.from(request);
    }
}

