/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.procedure;

import java.time.Duration;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
import org.apache.flink.types.Row;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.procedure.ProcedureBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.SnapshotNotExistException;
import org.apache.paimon.utils.TimeUtils;

public class CreateTagFromWatermarkProcedure
extends ProcedureBase {
    public static final String IDENTIFIER = "create_tag_from_watermark";

    @ProcedureHint(argument={@ArgumentHint(name="table", type=@DataTypeHint(value="STRING")), @ArgumentHint(name="tag", type=@DataTypeHint(value="STRING")), @ArgumentHint(name="watermark", type=@DataTypeHint(value="bigint")), @ArgumentHint(name="time_retained", type=@DataTypeHint(value="STRING"), isOptional=true)})
    @DataTypeHint(value="ROW< tagName STRING, snapshot BIGINT, `commit_time` BIGINT, `watermark` STRING>")
    public Row[] call(ProcedureContext procedureContext, String tableId, String tagName, Long watermark, @Nullable String timeRetained) throws Catalog.TableNotExistException {
        FileStoreTable fileStoreTable = (FileStoreTable)this.table(tableId);
        SnapshotManager snapshotManager = fileStoreTable.snapshotManager();
        Snapshot snapshot = snapshotManager.laterOrEqualWatermark(watermark);
        Set<Snapshot> sortedTagsSnapshots = fileStoreTable.tagManager().tags().keySet();
        for (Snapshot tagSnapshot : sortedTagsSnapshots) {
            if (tagSnapshot.watermark() == null || watermark > tagSnapshot.watermark()) continue;
            if (snapshot != null && snapshot.watermark() != null && tagSnapshot.watermark() >= snapshot.watermark()) break;
            snapshot = tagSnapshot;
            break;
        }
        SnapshotNotExistException.checkNotNull(snapshot, String.format("Could not find any snapshot whose watermark later than %s.", watermark));
        fileStoreTable.createTag(tagName, snapshot.id(), CreateTagFromWatermarkProcedure.toDuration(timeRetained));
        return new Row[]{Row.of((Object[])new Object[]{tagName, snapshot.id(), snapshot.timeMillis(), String.valueOf(snapshot.watermark())})};
    }

    @Nullable
    private static Duration toDuration(@Nullable String s) {
        if (s == null) {
            return null;
        }
        return TimeUtils.parseDuration(s);
    }

    @Override
    public String identifier() {
        return IDENTIFIER;
    }
}

