/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.mongodb.table;

import com.mongodb.MongoException;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.model.Filters;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.mongodb.common.config.MongoConnectionOptions;
import org.apache.flink.connector.mongodb.common.utils.MongoUtils;
import org.apache.flink.connector.mongodb.table.converter.BsonToRowDataConverters;
import org.apache.flink.connector.mongodb.table.converter.RowDataToBsonConverters;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Preconditions;
import org.bson.BsonDocument;
import org.bson.conversions.Bson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class MongoRowDataLookupFunction
extends LookupFunction {
    private static final Logger LOG = LoggerFactory.getLogger(MongoRowDataLookupFunction.class);
    private static final long serialVersionUID = 1L;
    private final MongoConnectionOptions connectionOptions;
    private final int maxRetries;
    private final long retryIntervalMs;
    private final List<String> fieldNames;
    private final List<String> keyNames;
    private final BsonToRowDataConverters.BsonToRowDataConverter mongoRowConverter;
    private final RowDataToBsonConverters.RowDataToBsonConverter lookupKeyRowConverter;
    private transient MongoClient mongoClient;

    public MongoRowDataLookupFunction(MongoConnectionOptions connectionOptions, int maxRetries, long retryIntervalMs, List<String> fieldNames, List<DataType> fieldTypes, List<String> keyNames, RowType rowType) {
        Preconditions.checkNotNull(fieldNames, (String)"No fieldNames supplied.");
        Preconditions.checkNotNull(fieldTypes, (String)"No fieldTypes supplied.");
        Preconditions.checkNotNull(keyNames, (String)"No keyNames supplied.");
        this.connectionOptions = (MongoConnectionOptions)Preconditions.checkNotNull((Object)connectionOptions);
        this.maxRetries = maxRetries;
        this.retryIntervalMs = retryIntervalMs;
        this.fieldNames = fieldNames;
        this.mongoRowConverter = BsonToRowDataConverters.createConverter(rowType);
        this.keyNames = keyNames;
        LogicalType[] keyTypes = (LogicalType[])this.keyNames.stream().map(s -> ((DataType)fieldTypes.get(fieldNames.indexOf(s))).getLogicalType()).toArray(LogicalType[]::new);
        this.lookupKeyRowConverter = RowDataToBsonConverters.createConverter(RowType.of((LogicalType[])keyTypes, (String[])keyNames.toArray(new String[0])));
    }

    public void open(FunctionContext context) {
        this.mongoClient = MongoClients.create((String)this.connectionOptions.getUri());
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public Collection<RowData> lookup(RowData keyRow) {
        int retry = 0;
        while (retry <= this.maxRetries) {
            try {
                BsonDocument lookupValues = this.lookupKeyRowConverter.convert(keyRow);
                List filters = this.keyNames.stream().map(name -> Filters.eq((String)name, (Object)lookupValues.get(name))).collect(Collectors.toList());
                Bson query = Filters.and(filters);
                Bson projection = MongoUtils.project(this.fieldNames);
                try (MongoCursor cursor = this.getMongoCollection().find(query).projection(projection).cursor();){
                    ArrayList<RowData> rows = new ArrayList<RowData>();
                    while (cursor.hasNext()) {
                        RowData row = this.mongoRowConverter.convert((BsonDocument)cursor.next());
                        rows.add(row);
                    }
                    ArrayList<RowData> arrayList = rows;
                    return arrayList;
                }
            }
            catch (MongoException e) {
                LOG.debug("MongoDB lookup error, retry times = {}", (Object)retry, (Object)e);
                if (retry == this.maxRetries) {
                    LOG.error("MongoDB lookup error", (Throwable)e);
                    throw new RuntimeException("Execution of MongoDB lookup failed.", e);
                }
                try {
                    Thread.sleep(this.retryIntervalMs * (long)(retry + 1));
                }
                catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e1);
                }
                ++retry;
            }
        }
        return Collections.emptyList();
    }

    private MongoCollection<BsonDocument> getMongoCollection() {
        return this.mongoClient.getDatabase(this.connectionOptions.getDatabase()).getCollection(this.connectionOptions.getCollection()).withDocumentClass(BsonDocument.class);
    }

    public void close() throws IOException {
        if (this.mongoClient != null) {
            this.mongoClient.close();
        }
    }
}

