/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.AsyncLookupFunction;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.paimon.flink.lookup.NewLookupFunction;
import org.apache.paimon.utils.ExecutorThreadFactory;

public class AsyncLookupFunctionWrapper
extends AsyncLookupFunction {
    private final NewLookupFunction function;
    private final int threadNumber;
    private transient ExecutorService lazyExecutor;

    public AsyncLookupFunctionWrapper(NewLookupFunction function, int threadNumber) {
        this.function = function;
        this.threadNumber = threadNumber;
    }

    public void open(FunctionContext context) throws Exception {
        this.function.open(context);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private Collection<RowData> lookup(RowData keyRow) {
        ClassLoader cl = Thread.currentThread().getContextClassLoader();
        Thread.currentThread().setContextClassLoader(AsyncLookupFunctionWrapper.class.getClassLoader());
        try {
            NewLookupFunction newLookupFunction = this.function;
            synchronized (newLookupFunction) {
                try {
                    Collection<RowData> collection = this.function.lookup(keyRow);
                    return collection;
                }
                catch (Throwable throwable) {
                    try {
                        throw throwable;
                    }
                    catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                }
            }
        }
        finally {
            Thread.currentThread().setContextClassLoader(cl);
        }
    }

    public CompletableFuture<Collection<RowData>> asyncLookup(RowData keyRow) {
        return CompletableFuture.supplyAsync(() -> this.lookup(keyRow), this.executor());
    }

    public void close() throws Exception {
        this.function.close();
        if (this.lazyExecutor != null) {
            this.lazyExecutor.shutdownNow();
            this.lazyExecutor = null;
        }
    }

    private ExecutorService executor() {
        if (this.lazyExecutor == null) {
            this.lazyExecutor = Executors.newFixedThreadPool(this.threadNumber, new ExecutorThreadFactory(Thread.currentThread().getName() + "-async"));
        }
        return this.lazyExecutor;
    }
}

