/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.lookup.DynamicPartitionLoader;
import org.apache.paimon.flink.lookup.FullCacheLookupTable;
import org.apache.paimon.flink.lookup.LookupTable;
import org.apache.paimon.flink.lookup.PrimaryKeyPartialLookupTable;
import org.apache.paimon.flink.query.RemoteTableQuery;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.lookup.RocksDBOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileStoreLookupFunction
implements Serializable,
Closeable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(FileStoreLookupFunction.class);
    private final Table table;
    @Nullable
    private final DynamicPartitionLoader partitionLoader;
    private final List<String> projectFields;
    private final List<String> joinKeys;
    @Nullable
    private final Predicate predicate;
    private transient Duration refreshInterval;
    private transient File path;
    private transient LookupTable lookupTable;
    private transient long nextLoadTime;
    protected FunctionContext functionContext;

    public FileStoreLookupFunction(Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) {
        TableScanUtils.streamingReadingValidate(table);
        this.table = table;
        this.partitionLoader = DynamicPartitionLoader.of(table);
        this.joinKeys = Arrays.stream(joinKeyIndex).mapToObj(i -> table.rowType().getFieldNames().get(projection[i])).collect(Collectors.toList());
        this.projectFields = Arrays.stream(projection).mapToObj(i -> table.rowType().getFieldNames().get(i)).collect(Collectors.toList());
        for (String field : table.primaryKeys()) {
            if (this.projectFields.contains(field)) continue;
            this.projectFields.add(field);
        }
        if (this.partitionLoader != null) {
            this.partitionLoader.addPartitionKeysTo(this.joinKeys, this.projectFields);
        }
        this.predicate = predicate;
    }

    public void open(FunctionContext context) throws Exception {
        this.functionContext = context;
        String tmpDirectory = FileStoreLookupFunction.getTmpDirectory(context);
        this.open(tmpDirectory);
    }

    void open(String tmpDirectory) throws Exception {
        this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
        if (!this.path.mkdirs()) {
            throw new RuntimeException("Failed to create dir: " + this.path);
        }
        this.open();
    }

    private void open() throws Exception {
        if (this.partitionLoader != null) {
            this.partitionLoader.open();
        }
        this.nextLoadTime = -1L;
        Options options = Options.fromMap(this.table.options());
        this.refreshInterval = options.getOptional(RocksDBOptions.LOOKUP_CONTINUOUS_DISCOVERY_INTERVAL).orElse(options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL));
        List<String> fieldNames = this.table.rowType().getFieldNames();
        int[] projection = this.projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
        FileStoreTable storeTable = (FileStoreTable)this.table;
        if (options.get(FlinkConnectorOptions.LOOKUP_CACHE_MODE) == FlinkConnectorOptions.LookupCacheMode.AUTO && new HashSet<String>(this.table.primaryKeys()).equals(new HashSet<String>(this.joinKeys))) {
            if (RemoteTableQuery.isRemoteServiceAvailable(storeTable)) {
                this.lookupTable = PrimaryKeyPartialLookupTable.createRemoteTable(storeTable, projection, this.joinKeys);
            } else {
                try {
                    this.lookupTable = PrimaryKeyPartialLookupTable.createLocalTable(storeTable, projection, this.path, this.joinKeys, this.getRequireCachedBucketIds());
                }
                catch (UnsupportedOperationException unsupportedOperationException) {
                    // empty catch block
                }
            }
        }
        if (this.lookupTable == null) {
            FullCacheLookupTable.Context context = new FullCacheLookupTable.Context(storeTable, projection, this.predicate, this.createProjectedPredicate(projection), this.path, this.joinKeys, this.getRequireCachedBucketIds());
            this.lookupTable = FullCacheLookupTable.create(context, options.get(RocksDBOptions.LOOKUP_CACHE_ROWS));
        }
        this.refreshDynamicPartition(false);
        this.lookupTable.open();
    }

    @Nullable
    private Predicate createProjectedPredicate(int[] projection) {
        Predicate adjustedPredicate = null;
        if (this.predicate != null) {
            adjustedPredicate = PredicateBuilder.transformFieldMapping(this.predicate, IntStream.range(0, this.table.rowType().getFieldCount()).map(i -> Ints.indexOf(projection, i)).toArray()).orElse(null);
        }
        return adjustedPredicate;
    }

    public Collection<RowData> lookup(RowData keyRow) {
        try {
            this.checkRefresh();
            InternalRow key = new FlinkRowWrapper(keyRow);
            if (this.partitionLoader != null) {
                BinaryRow partition = this.refreshDynamicPartition(true);
                if (partition == null) {
                    return Collections.emptyList();
                }
                key = JoinedRow.join(key, partition);
            }
            List<InternalRow> results = this.lookupTable.get(key);
            ArrayList<RowData> rows = new ArrayList<RowData>(results.size());
            for (InternalRow matchedRow : results) {
                rows.add(new FlinkRowData(matchedRow));
            }
            return rows;
        }
        catch (OutOfRangeException e) {
            this.reopen();
            return this.lookup(keyRow);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Nullable
    private BinaryRow refreshDynamicPartition(boolean reopen) throws Exception {
        if (this.partitionLoader == null) {
            return null;
        }
        boolean partitionChanged = this.partitionLoader.checkRefresh();
        BinaryRow partition = this.partitionLoader.partition();
        if (partition == null) {
            return null;
        }
        this.lookupTable.specificPartitionFilter(this.createSpecificPartFilter(partition));
        if (partitionChanged && reopen) {
            this.lookupTable.close();
            this.lookupTable.open();
        }
        return partition;
    }

    private Predicate createSpecificPartFilter(BinaryRow partition) {
        RowType rowType = this.table.rowType();
        List<String> partitionKeys = this.table.partitionKeys();
        Object[] partitionSpec = new RowDataToObjectArrayConverter(rowType.project(partitionKeys)).convert(partition);
        HashMap<String, Object> partitionMap = new HashMap<String, Object>(partitionSpec.length);
        for (int i = 0; i < partitionSpec.length; ++i) {
            partitionMap.put(partitionKeys.get(i), partitionSpec[i]);
        }
        return PartitionPredicate.createPartitionPredicate(rowType, partitionMap);
    }

    private void reopen() {
        try {
            this.close();
            this.open();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void checkRefresh() throws Exception {
        if (this.nextLoadTime > System.currentTimeMillis()) {
            return;
        }
        if (this.nextLoadTime > 0L) {
            LOG.info("Lookup table {} has refreshed after {} second(s), refreshing", (Object)this.table.name(), (Object)(this.refreshInterval.toMillis() / 1000L));
        }
        this.refresh();
        this.nextLoadTime = System.currentTimeMillis() + this.refreshInterval.toMillis();
    }

    @VisibleForTesting
    LookupTable lookupTable() {
        return this.lookupTable;
    }

    private void refresh() throws Exception {
        this.lookupTable.refresh();
    }

    @Override
    public void close() throws IOException {
        if (this.lookupTable != null) {
            this.lookupTable.close();
            this.lookupTable = null;
        }
        if (this.path != null) {
            FileIOUtils.deleteDirectoryQuietly(this.path);
        }
    }

    private static String getTmpDirectory(FunctionContext context) {
        try {
            Field field = context.getClass().getDeclaredField("context");
            field.setAccessible(true);
            StreamingRuntimeContext runtimeContext = FileStoreLookupFunction.extractStreamingRuntimeContext(field.get(context));
            String[] tmpDirectories = runtimeContext.getTaskManagerRuntimeInfo().getTmpDirectories();
            return tmpDirectories[ThreadLocalRandom.current().nextInt(tmpDirectories.length)];
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException(e);
        }
    }

    private static StreamingRuntimeContext extractStreamingRuntimeContext(Object runtimeContext) throws NoSuchFieldException, IllegalAccessException {
        if (runtimeContext instanceof StreamingRuntimeContext) {
            return (StreamingRuntimeContext)runtimeContext;
        }
        Field field = runtimeContext.getClass().getDeclaredField("runtimeContext");
        field.setAccessible(true);
        return FileStoreLookupFunction.extractStreamingRuntimeContext(field.get(runtimeContext));
    }

    protected Set<Integer> getRequireCachedBucketIds() {
        return null;
    }
}

