/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.connectors.elasticsearch.table;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.LookupFunction;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.TermQueryBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ElasticsearchRowDataLookupFunction<C extends AutoCloseable>
extends LookupFunction {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchRowDataLookupFunction.class);
    private static final long serialVersionUID = 1L;
    private final DeserializationSchema<RowData> deserializationSchema;
    private final String index;
    private final String type;
    private final String[] producedNames;
    private final String[] lookupKeys;
    private final int maxRetryTimes;
    private final DataFormatConverters.DataFormatConverter[] converters;
    private SearchRequest searchRequest;
    private SearchSourceBuilder searchSourceBuilder;
    private final ElasticsearchApiCallBridge<C> callBridge;
    private transient C client;

    public ElasticsearchRowDataLookupFunction(DeserializationSchema<RowData> deserializationSchema, int maxRetryTimes, String index, String type, String[] producedNames, DataType[] producedTypes, String[] lookupKeys, ElasticsearchApiCallBridge<C> callBridge) {
        Preconditions.checkNotNull(deserializationSchema, (String)"No DeserializationSchema supplied.");
        Preconditions.checkNotNull((Object)maxRetryTimes, (String)"No maxRetryTimes supplied.");
        Preconditions.checkNotNull((Object)producedNames, (String)"No fieldNames supplied.");
        Preconditions.checkNotNull((Object)producedTypes, (String)"No fieldTypes supplied.");
        Preconditions.checkNotNull((Object)lookupKeys, (String)"No keyNames supplied.");
        Preconditions.checkNotNull(callBridge, (String)"No ElasticsearchApiCallBridge supplied.");
        this.deserializationSchema = deserializationSchema;
        this.maxRetryTimes = maxRetryTimes;
        this.index = index;
        this.type = type;
        this.producedNames = producedNames;
        this.lookupKeys = lookupKeys;
        this.converters = new DataFormatConverters.DataFormatConverter[lookupKeys.length];
        Map<String, Integer> nameToIndex = IntStream.range(0, producedNames.length).boxed().collect(Collectors.toMap(i -> producedNames[i], i -> i));
        for (int i2 = 0; i2 < lookupKeys.length; ++i2) {
            Integer position = nameToIndex.get(lookupKeys[i2]);
            Preconditions.checkArgument((position != null ? 1 : 0) != 0, (String)"Lookup keys %s not selected", (Object[])new Object[]{Arrays.toString(lookupKeys)});
            this.converters[i2] = DataFormatConverters.getConverterForDataType((DataType)producedTypes[position]);
        }
        this.callBridge = callBridge;
    }

    public void open(FunctionContext context) throws Exception {
        this.client = this.callBridge.createClient();
        this.searchRequest = new SearchRequest(new String[]{this.index});
        if (this.type == null) {
            this.searchRequest.types(Strings.EMPTY_ARRAY);
        } else {
            this.searchRequest.types(new String[]{this.type});
        }
        this.searchSourceBuilder = new SearchSourceBuilder();
        this.searchSourceBuilder.fetchSource(this.producedNames, null);
        this.deserializationSchema.open(null);
    }

    public Collection<RowData> lookup(RowData keyRow) {
        BoolQueryBuilder lookupCondition = new BoolQueryBuilder();
        for (int i = 0; i < this.lookupKeys.length; ++i) {
            lookupCondition.must((QueryBuilder)new TermQueryBuilder(this.lookupKeys[i], this.converters[i].toExternal(keyRow, i)));
        }
        this.searchSourceBuilder.query((QueryBuilder)lookupCondition);
        this.searchRequest.source(this.searchSourceBuilder);
        for (int retry = 0; retry <= this.maxRetryTimes; ++retry) {
            try {
                String[] result;
                ArrayList<RowData> rows = new ArrayList<RowData>();
                Tuple2<String, String[]> searchResponse = this.callBridge.search(this.client, this.searchRequest);
                if (((String[])searchResponse.f1).length <= 0) continue;
                for (String s : result = (String[])searchResponse.f1) {
                    RowData row = this.parseSearchResult(s);
                    rows.add(row);
                }
                rows.trimToSize();
                return rows;
            }
            catch (IOException e) {
                LOG.error(String.format("Elasticsearch search error, retry times = %d", retry), (Throwable)e);
                if (retry >= this.maxRetryTimes) {
                    throw new FlinkRuntimeException("Execution of Elasticsearch search failed.", (Throwable)e);
                }
                try {
                    Thread.sleep(1000L * (long)retry);
                    continue;
                }
                catch (InterruptedException e1) {
                    LOG.warn("Interrupted while waiting to retry failed elasticsearch search, aborting");
                    throw new FlinkRuntimeException((Throwable)e1);
                }
            }
        }
        return Collections.emptyList();
    }

    private RowData parseSearchResult(String result) {
        RowData row = null;
        try {
            row = (RowData)this.deserializationSchema.deserialize(result.getBytes());
        }
        catch (IOException e) {
            LOG.error("Deserialize search hit failed: " + e.getMessage());
        }
        return row;
    }
}

