/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.procedure;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SerializerConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.java.typeutils.GenericTypeInfo;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.util.function.SerializableSupplier;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.procedure.ProcedureBase;
import org.apache.paimon.flink.sink.NoneCopyVersionedSerializerTypeSerializerProxy;
import org.apache.paimon.flink.sink.RewriteFileIndexSink;
import org.apache.paimon.flink.source.RewriteFileIndexSource;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestEntrySerializer;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.ParameterUtils;
import org.apache.paimon.utils.StringUtils;

public class RewriteFileIndexProcedure
extends ProcedureBase {
    @Override
    public String identifier() {
        return "rewrite_file_index";
    }

    @ProcedureHint(argument={@ArgumentHint(name="table", type=@DataTypeHint(value="STRING")), @ArgumentHint(name="partitions", type=@DataTypeHint(value="STRING"), isOptional=true)})
    public String[] call(ProcedureContext procedureContext, String sourceTablePath, String partitions) throws Exception {
        partitions = this.notnull(partitions);
        StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();
        Table table = this.catalog.getTable(Identifier.fromString(sourceTablePath));
        List<Map<String, String>> partitionList = StringUtils.isBlank(partitions) ? null : ParameterUtils.getPartitions(partitions.split(";"));
        Predicate partitionPredicate = partitionList != null ? PredicateBuilder.or((Predicate[])partitionList.stream().map(p -> PredicateBuilder.partition(p, ((FileStoreTable)table).schema().logicalPartitionType(), CoreOptions.PARTITION_DEFAULT_NAME.defaultValue())).toArray(Predicate[]::new)) : null;
        FileStoreTable storeTable = (FileStoreTable)table;
        DataStreamSource source = env.fromSource((Source)new RewriteFileIndexSource(storeTable, partitionPredicate), WatermarkStrategy.noWatermarks(), "index source", (TypeInformation)new ManifestEntryTypeInfo());
        new RewriteFileIndexSink(storeTable).sinkFrom(source);
        return this.execute(env, "Add file index for table: " + sourceTablePath);
    }

    private static class ManifestEntryTypeInfo
    extends GenericTypeInfo<ManifestEntry> {
        public ManifestEntryTypeInfo() {
            super(ManifestEntry.class);
        }

        public TypeSerializer<ManifestEntry> createSerializer(SerializerConfig config) {
            return new NoneCopyVersionedSerializerTypeSerializerProxy((SerializableSupplier & Serializable)() -> new SimpleVersionedSerializer<ManifestEntry>(){
                private final ManifestEntrySerializer manifestEntrySerializer = new ManifestEntrySerializer();

                public int getVersion() {
                    return 0;
                }

                public byte[] serialize(ManifestEntry manifestEntry) throws IOException {
                    return this.manifestEntrySerializer.serializeToBytes(manifestEntry);
                }

                public ManifestEntry deserialize(int i, byte[] bytes) throws IOException {
                    return (ManifestEntry)this.manifestEntrySerializer.deserializeFromBytes(bytes);
                }
            });
        }
    }
}

