package com.alibaba.ververica.connectors.hologres.source.lookup;

import com.alibaba.ververica.connectors.common.dim.cache.CacheStrategy;
import com.alibaba.ververica.connectors.hologres.api.AbstractHologresReader;
import java.util.Collections;
import java.util.List;
import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/alibaba/ververica/connectors/hologres/source/lookup/HologresAsyncLookupFunction.class */
public class HologresAsyncLookupFunction extends AbstractHologresLookupFunction<List<RowData>> implements AsyncFunction<RowData, RowData> {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) HologresAsyncLookupFunction.class);

    public HologresAsyncLookupFunction(String str, TableSchema tableSchema, String[] strArr, CacheStrategy cacheStrategy, AbstractHologresReader<RowData> abstractHologresReader, boolean z) {
        super(str, tableSchema, strArr, cacheStrategy, abstractHologresReader, z);
    }

    public void asyncInvoke(RowData rowData, ResultFuture<RowData> resultFuture) throws Exception {
        Object sourceKey = getSourceKey(rowData);
        if (sourceKey == null) {
            LOG.debug("Join Hologres on an empty key of row: {}", rowData);
            resultFuture.complete(Collections.emptyList());
            return;
        }
        List list = (List) this.cache.get(sourceKey);
        if (list != null) {
            resultFuture.complete(list);
        } else {
            (this.hasPrimaryKey ? this.hologresReader.asyncGet(rowData).thenApply(rowData2 -> {
                return rowData2 == null ? Collections.emptyList() : Lists.newArrayList(new RowData[]{rowData2});
            }) : this.hologresReader.asyncGetMany(rowData)).handle((list2, th) -> {
                try {
                    if (th != null) {
                        resultFuture.completeExceptionally(th);
                    } else if (list2 == null || list2.isEmpty()) {
                        if (this.cacheStrategy.isCacheEmpty()) {
                            this.cache.put(sourceKey, Collections.emptyList());
                        }
                        resultFuture.complete(Collections.emptyList());
                    } else {
                        this.cache.put(sourceKey, list2);
                        resultFuture.complete(list2);
                    }
                    return null;
                } catch (Throwable th) {
                    resultFuture.completeExceptionally(th);
                    return null;
                }
            });
        }
    }

    public /* bridge */ /* synthetic */ void asyncInvoke(Object obj, ResultFuture resultFuture) throws Exception {
        asyncInvoke((RowData) obj, (ResultFuture<RowData>) resultFuture);
    }
}
