/*
 * Decompiled with CFR 0.152.
 */
package org.hswebframework.ezorm.rdb.executor.reactive.r2dbc;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Statement;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.Date;
import java.util.stream.Collectors;
import org.hswebframework.ezorm.core.CastUtil;
import org.hswebframework.ezorm.rdb.executor.BatchSqlRequest;
import org.hswebframework.ezorm.rdb.executor.DefaultColumnWrapperContext;
import org.hswebframework.ezorm.rdb.executor.NullValue;
import org.hswebframework.ezorm.rdb.executor.SqlRequest;
import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSqlExecutor;
import org.hswebframework.ezorm.rdb.executor.reactive.r2dbc.R2dbcSqlRequest;
import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
import org.hswebframework.ezorm.rdb.utils.SqlUtils;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SignalType;

public abstract class R2dbcReactiveSqlExecutor
implements ReactiveSqlExecutor {
    private static final Logger log = LoggerFactory.getLogger(R2dbcReactiveSqlExecutor.class);
    private Logger logger = log;

    protected abstract Mono<Connection> getConnection();

    protected abstract void releaseConnection(SignalType var1, Connection var2);

    protected Flux<Result> doExecute(Connection connection, SqlRequest request) {
        return Flux.from((Publisher)this.prepareStatement(connection.createStatement(request.getSql()), request).execute()).map(Result.class::cast).doOnSubscribe(subscription -> SqlUtils.printSql(this.logger, request)).doOnError(err -> this.logger.error("==>      Error: {}", (Object)request.toNativeSql(), err));
    }

    private Flux<Result> doExecute(Flux<SqlRequest> sqlRequestFlux) {
        return this.getConnection().flatMapMany(connection -> sqlRequestFlux.flatMap(sqlRequest -> this.doExecute((Connection)connection, (SqlRequest)sqlRequest)).doFinally(type -> this.releaseConnection((SignalType)type, (Connection)connection)));
    }

    @Override
    public Mono<Integer> update(Publisher<SqlRequest> request) {
        return this.doExecute(this.toFlux(request)).flatMap(result -> Mono.from((Publisher)result.getRowsUpdated()).defaultIfEmpty((Object)0)).doOnNext(count -> this.logger.debug("==>    Updated: {}", count)).collect(Collectors.summingInt(Integer::intValue)).defaultIfEmpty((Object)0);
    }

    @Override
    public Mono<Void> execute(Publisher<SqlRequest> request) {
        return this.doExecute(this.toFlux(request)).flatMap(Result::getRowsUpdated).then();
    }

    @Override
    public <E> Flux<E> select(Publisher<SqlRequest> request, ResultWrapper<E, ?> wrapper) {
        return ((Flux)this.toFlux(request).as(this::doExecute)).flatMap(result -> result.map((row, meta) -> {
            ArrayList columns = new ArrayList(meta.getColumnNames());
            wrapper.beforeWrap(() -> columns);
            Object e = wrapper.newRowInstance();
            int len = columns.size();
            for (int i = 0; i < len; ++i) {
                String column = (String)columns.get(i);
                DefaultColumnWrapperContext context = new DefaultColumnWrapperContext(i, column, row.get(column), e);
                wrapper.wrapColumn(context);
                e = context.getRowInstance();
            }
            if (!wrapper.completedWrapRow(e)) {
                return Interrupted.instance;
            }
            return e;
        })).takeWhile(Interrupted::nonInterrupted).map(CastUtil::cast).doOnCancel(wrapper::completedWrap).doOnComplete(wrapper::completedWrap);
    }

    protected Flux<SqlRequest> toFlux(Publisher<SqlRequest> request) {
        return Flux.from(request).flatMap(sql -> {
            if (sql instanceof BatchSqlRequest) {
                return Flux.concat((Publisher[])new Publisher[]{Flux.just((Object)sql), Flux.fromIterable(((BatchSqlRequest)sql).getBatch())});
            }
            return Flux.just((Object)sql);
        }).filter(SqlRequest::isNotEmpty).map(this::convertRequest);
    }

    protected SqlRequest convertRequest(SqlRequest sqlRequest) {
        return R2dbcSqlRequest.of(this.getBindFirstIndex(), this.getBindSymbol(), sqlRequest);
    }

    protected String getBindSymbol() {
        return "$";
    }

    protected int getBindFirstIndex() {
        return 1;
    }

    protected void bindNull(Statement statement, int index, Class<?> type) {
        if (type == Date.class) {
            type = LocalDateTime.class;
        }
        statement.bindNull(this.getBindSymbol() + (index + this.getBindFirstIndex()), type);
    }

    protected void bind(Statement statement, int index, Object value) {
        if (value instanceof Date) {
            value = ((Date)value).toInstant().atZone(ZoneOffset.systemDefault()).toLocalDateTime();
        }
        statement.bind(this.getBindSymbol() + (index + this.getBindFirstIndex()), value);
    }

    protected Statement prepareStatement(Statement statement, SqlRequest request) {
        if (request.isEmpty() || request.getParameters() == null) {
            return statement;
        }
        int index = 0;
        for (Object parameter : request.getParameters()) {
            if (parameter == null) {
                this.bindNull(statement, index, String.class);
            } else if (parameter instanceof NullValue) {
                this.bindNull(statement, index, ((NullValue)parameter).getDataType().getJavaType());
            } else {
                this.bind(statement, index, parameter);
            }
            ++index;
        }
        return statement;
    }

    public Logger getLogger() {
        return this.logger;
    }

    public void setLogger(Logger logger) {
        this.logger = logger;
    }

    static enum Interrupted {
        instance;


        static boolean nonInterrupted(Object o) {
            return o != instance;
        }
    }
}

