/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.jdbc.table;

import java.io.IOException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
import org.apache.flink.connector.jdbc.internal.connection.JdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import org.apache.flink.connector.jdbc.internal.options.JdbcConnectorOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class JdbcRowDataLookupFunction
extends TableFunction<RowData> {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcRowDataLookupFunction.class);
    private static final long serialVersionUID = 2L;
    private final String query;
    private final JdbcConnectionProvider connectionProvider;
    private final DataType[] keyTypes;
    private final String[] keyNames;
    private final long cacheMaxSize;
    private final long cacheExpireMs;
    private final int maxRetryTimes;
    private final boolean cacheMissingKey;
    private final JdbcDialect jdbcDialect;
    private final JdbcRowConverter jdbcRowConverter;
    private final JdbcRowConverter lookupKeyRowConverter;
    private transient FieldNamedPreparedStatement statement;
    private transient Cache<RowData, List<RowData>> cache;

    public JdbcRowDataLookupFunction(JdbcConnectorOptions options, JdbcLookupOptions lookupOptions, String[] fieldNames, DataType[] fieldTypes, String[] keyNames, RowType rowType) {
        Preconditions.checkNotNull((Object)options, (String)"No JdbcOptions supplied.");
        Preconditions.checkNotNull((Object)fieldNames, (String)"No fieldNames supplied.");
        Preconditions.checkNotNull((Object)fieldTypes, (String)"No fieldTypes supplied.");
        Preconditions.checkNotNull((Object)keyNames, (String)"No keyNames supplied.");
        this.connectionProvider = new SimpleJdbcConnectionProvider(options);
        this.keyNames = keyNames;
        List<String> nameList = Arrays.asList(fieldNames);
        this.keyTypes = (DataType[])Arrays.stream(keyNames).map(s -> {
            Preconditions.checkArgument((boolean)nameList.contains(s), (String)"keyName %s can't find in fieldNames %s.", (Object[])new Object[]{s, nameList});
            return fieldTypes[nameList.indexOf(s)];
        }).toArray(DataType[]::new);
        this.cacheMaxSize = lookupOptions.getCacheMaxSize();
        this.cacheExpireMs = lookupOptions.getCacheExpireMs();
        this.maxRetryTimes = lookupOptions.getMaxRetryTimes();
        this.cacheMissingKey = lookupOptions.getCacheMissingKey();
        this.query = options.getDialect().getSelectFromStatement(options.getTableName(), fieldNames, keyNames);
        String dbURL = options.getDbURL();
        this.jdbcDialect = JdbcDialectLoader.load(dbURL);
        this.jdbcRowConverter = this.jdbcDialect.getRowConverter(rowType);
        this.lookupKeyRowConverter = this.jdbcDialect.getRowConverter(RowType.of((LogicalType[])((LogicalType[])Arrays.stream(this.keyTypes).map(DataType::getLogicalType).toArray(LogicalType[]::new))));
    }

    public void open(FunctionContext context) throws Exception {
        try {
            this.establishConnectionAndStatement();
            this.cache = this.cacheMaxSize == -1L || this.cacheExpireMs == -1L ? null : CacheBuilder.newBuilder().expireAfterWrite(this.cacheExpireMs, TimeUnit.MILLISECONDS).maximumSize(this.cacheMaxSize).build();
        }
        catch (SQLException sqe) {
            throw new IllegalArgumentException("open() failed.", sqe);
        }
        catch (ClassNotFoundException cnfe) {
            throw new IllegalArgumentException("JDBC driver class not found.", cnfe);
        }
    }

    public void eval(Object ... keys) {
        List cachedRows;
        GenericRowData keyRow = GenericRowData.of((Object[])keys);
        if (this.cache != null && (cachedRows = (List)this.cache.getIfPresent((Object)keyRow)) != null) {
            for (RowData cachedRow : cachedRows) {
                this.collect(cachedRow);
            }
            return;
        }
        for (int retry = 0; retry <= this.maxRetryTimes; ++retry) {
            try {
                this.statement.clearParameters();
                this.statement = this.lookupKeyRowConverter.toExternal((RowData)keyRow, this.statement);
                try (ResultSet resultSet = this.statement.executeQuery();){
                    if (this.cache == null) {
                        while (resultSet.next()) {
                            this.collect(this.jdbcRowConverter.toInternal(resultSet));
                        }
                    } else {
                        ArrayList<RowData> rows = new ArrayList<RowData>();
                        while (resultSet.next()) {
                            RowData row = this.jdbcRowConverter.toInternal(resultSet);
                            rows.add(row);
                            this.collect(row);
                        }
                        rows.trimToSize();
                        if (!rows.isEmpty() || this.cacheMissingKey) {
                            this.cache.put((Object)keyRow, rows);
                        }
                    }
                    break;
                }
            }
            catch (SQLException e) {
                LOG.error(String.format("JDBC executeBatch error, retry times = %d", retry), (Throwable)e);
                if (retry >= this.maxRetryTimes) {
                    throw new RuntimeException("Execution of JDBC statement failed.", e);
                }
                try {
                    if (!this.connectionProvider.isConnectionValid()) {
                        this.statement.close();
                        this.connectionProvider.closeConnection();
                        this.establishConnectionAndStatement();
                    }
                }
                catch (ClassNotFoundException | SQLException exception) {
                    LOG.error("JDBC connection is not valid, and reestablish connection failed", (Throwable)exception);
                    throw new RuntimeException("Reestablish JDBC connection failed", exception);
                }
                try {
                    Thread.sleep(1000 * retry);
                    continue;
                }
                catch (InterruptedException e1) {
                    throw new RuntimeException(e1);
                }
            }
        }
    }

    private void establishConnectionAndStatement() throws SQLException, ClassNotFoundException {
        Connection dbConn = this.connectionProvider.getOrEstablishConnection();
        this.statement = FieldNamedPreparedStatement.prepareStatement(dbConn, this.query, this.keyNames);
    }

    public void close() throws IOException {
        if (this.cache != null) {
            this.cache.cleanUp();
            this.cache = null;
        }
        if (this.statement != null) {
            try {
                this.statement.close();
            }
            catch (SQLException e) {
                LOG.info("JDBC statement could not be closed: " + e.getMessage());
            }
            finally {
                this.statement = null;
            }
        }
        this.connectionProvider.closeConnection();
    }

    @VisibleForTesting
    public Connection getDbConnection() {
        return this.connectionProvider.getConnection();
    }

    @VisibleForTesting
    public Cache<RowData, List<RowData>> getCache() {
        return this.cache;
    }
}

