/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.paimon.annotation.Documentation;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.fileindex.FileIndexOptions;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.fs.Path;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.ConfigOptions;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.options.description.DescribedEnum;
import org.apache.paimon.options.description.Description;
import org.apache.paimon.options.description.InlineElement;
import org.apache.paimon.options.description.TextElement;
import org.apache.paimon.utils.MathUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;

public class CoreOptions
implements Serializable {
    public static final String DEFAULT_VALUE_SUFFIX = "default-value";
    public static final String FIELDS_PREFIX = "fields";
    public static final String AGG_FUNCTION = "aggregate-function";
    public static final String IGNORE_RETRACT = "ignore-retract";
    public static final String NESTED_KEY = "nested-key";
    public static final String DISTINCT = "distinct";
    public static final String FILE_INDEX = "file-index";
    public static final String COLUMNS = "columns";
    public static final ConfigOption<Integer> BUCKET = ConfigOptions.key("bucket").intType().defaultValue(-1).withDescription(Description.builder().text("Bucket number for file store.").linebreak().text("It should either be equal to -1 (dynamic bucket mode), or it must be greater than 0 (fixed bucket mode).").build());
    @Documentation.Immutable
    public static final ConfigOption<String> BUCKET_KEY = ConfigOptions.key("bucket-key").stringType().noDefaultValue().withDescription(Description.builder().text("Specify the paimon distribution policy. Data is assigned to each bucket according to the hash value of bucket-key.").linebreak().text("If you specify multiple fields, delimiter is ','.").linebreak().text("If not specified, the primary key will be used; if there is no primary key, the full row will be used.").build());
    @Documentation.ExcludeFromDocumentation(value="Internal use only")
    public static final ConfigOption<String> PATH = ConfigOptions.key("path").stringType().noDefaultValue().withDescription("The file path of this table in the filesystem.");
    public static final ConfigOption<FileFormatType> FILE_FORMAT = ConfigOptions.key("file.format").enumType(FileFormatType.class).defaultValue(FileFormatType.ORC).withDescription("Specify the message format of data files, currently orc, parquet and avro are supported.");
    public static final ConfigOption<Map<String, String>> FILE_COMPRESSION_PER_LEVEL = ConfigOptions.key("file.compression.per.level").mapType().defaultValue(new HashMap()).withDescription("Define different compression policies for different level, you can add the conf like this: 'file.compression.per.level' = '0:lz4,1:zstd'.");
    public static final ConfigOption<Map<String, String>> FILE_FORMAT_PER_LEVEL = ConfigOptions.key("file.format.per.level").mapType().defaultValue(new HashMap()).withDescription("Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `" + FILE_FORMAT.key() + "` will be used.");
    public static final ConfigOption<String> FILE_COMPRESSION = ConfigOptions.key("file.compression").stringType().noDefaultValue().withDescription("Default file compression format, orc is lz4 and parquet is snappy. It can be overridden by " + FILE_COMPRESSION_PER_LEVEL.key());
    public static final ConfigOption<MemorySize> FILE_INDEX_IN_MANIFEST_THRESHOLD = ConfigOptions.key("file-index.in-manifest-threshold").memoryType().defaultValue(MemorySize.parse("500 B")).withDescription("The threshold to store file index bytes in manifest.");
    public static final ConfigOption<Boolean> FILE_INDEX_READ_ENABLED = ConfigOptions.key("file-index.read.enabled").booleanType().defaultValue(true).withDescription("Whether enabled read file index.");
    public static final ConfigOption<FileFormatType> MANIFEST_FORMAT = ConfigOptions.key("manifest.format").enumType(FileFormatType.class).defaultValue(FileFormatType.AVRO).withDescription("Specify the message format of manifest files.");
    public static final ConfigOption<MemorySize> MANIFEST_TARGET_FILE_SIZE = ConfigOptions.key("manifest.target-file-size").memoryType().defaultValue(MemorySize.ofMebiBytes(8L)).withDescription("Suggested file size of a manifest file.");
    public static final ConfigOption<MemorySize> MANIFEST_FULL_COMPACTION_FILE_SIZE = ConfigOptions.key("manifest.full-compaction-threshold-size").memoryType().defaultValue(MemorySize.ofMebiBytes(16L)).withDescription("The size threshold for triggering full compaction of manifest.");
    public static final ConfigOption<Integer> MANIFEST_MERGE_MIN_COUNT = ConfigOptions.key("manifest.merge-min-count").intType().defaultValue(30).withDescription("To avoid frequent manifest merges, this parameter specifies the minimum number of ManifestFileMeta to merge.");
    public static final ConfigOption<String> PARTITION_DEFAULT_NAME = ConfigOptions.key("partition.default-name").stringType().defaultValue("__DEFAULT_PARTITION__").withDescription("The default partition name in case the dynamic partition column value is null/empty string.");
    public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MIN = ConfigOptions.key("snapshot.num-retained.min").intType().defaultValue(10).withDescription("The minimum number of completed snapshots to retain. Should be greater than or equal to 1.");
    @Documentation.OverrideDefault(value="infinite")
    public static final ConfigOption<Integer> SNAPSHOT_NUM_RETAINED_MAX = ConfigOptions.key("snapshot.num-retained.max").intType().defaultValue(Integer.MAX_VALUE).withDescription("The maximum number of completed snapshots to retain. Should be greater than or equal to the minimum number.");
    public static final ConfigOption<Duration> SNAPSHOT_TIME_RETAINED = ConfigOptions.key("snapshot.time-retained").durationType().defaultValue(Duration.ofHours(1L)).withDescription("The maximum time of completed snapshots to retain.");
    public static final ConfigOption<Integer> CHANGELOG_NUM_RETAINED_MIN = ConfigOptions.key("changelog.num-retained.min").intType().noDefaultValue().withDescription("The minimum number of completed changelog to retain. Should be greater than or equal to 1.");
    public static final ConfigOption<Integer> CHANGELOG_NUM_RETAINED_MAX = ConfigOptions.key("changelog.num-retained.max").intType().noDefaultValue().withDescription("The maximum number of completed changelog to retain. Should be greater than or equal to the minimum number.");
    public static final ConfigOption<Duration> CHANGELOG_TIME_RETAINED = ConfigOptions.key("changelog.time-retained").durationType().noDefaultValue().withDescription("The maximum time of completed changelog to retain.");
    public static final ConfigOption<ExpireExecutionMode> SNAPSHOT_EXPIRE_EXECUTION_MODE = ConfigOptions.key("snapshot.expire.execution-mode").enumType(ExpireExecutionMode.class).defaultValue(ExpireExecutionMode.SYNC).withDescription("Specifies the execution mode of expire.");
    public static final ConfigOption<Integer> SNAPSHOT_EXPIRE_LIMIT = ConfigOptions.key("snapshot.expire.limit").intType().defaultValue(10).withDescription("The maximum number of snapshots allowed to expire at a time.");
    public static final ConfigOption<Boolean> SNAPSHOT_EXPIRE_CLEAN_EMPTY_DIRECTORIES = ConfigOptions.key("snapshot.expire.clean-empty-directories").booleanType().defaultValue(true).withDescription("Whether to try to clean empty directories when expiring snapshots. Note that trying to clean directories might throw exceptions in filesystem, but in most cases it won't cause problems.");
    public static final ConfigOption<Duration> CONTINUOUS_DISCOVERY_INTERVAL = ConfigOptions.key("continuous.discovery-interval").durationType().defaultValue(Duration.ofSeconds(10L)).withDescription("The discovery interval of continuous reading.");
    public static final ConfigOption<Integer> SCAN_MAX_SPLITS_PER_TASK = ConfigOptions.key("scan.max-splits-per-task").intType().defaultValue(10).withDescription("Max split size should be cached for one task while scanning. If splits size cached in enumerator are greater than tasks size multiply by this value, scanner will pause scanning.");
    @Documentation.Immutable
    public static final ConfigOption<MergeEngine> MERGE_ENGINE = ConfigOptions.key("merge-engine").enumType(MergeEngine.class).defaultValue(MergeEngine.DEDUPLICATE).withDescription("Specify the merge engine for table with primary key.");
    public static final ConfigOption<Boolean> IGNORE_DELETE = ConfigOptions.key("ignore-delete").booleanType().defaultValue(false).withDeprecatedKeys("first-row.ignore-delete", "deduplicate.ignore-delete", "partial-update.ignore-delete").withDescription("Whether to ignore delete records.");
    public static final ConfigOption<SortEngine> SORT_ENGINE = ConfigOptions.key("sort-engine").enumType(SortEngine.class).defaultValue(SortEngine.LOSER_TREE).withDescription("Specify the sort engine for table with primary key.");
    public static final ConfigOption<Integer> SORT_SPILL_THRESHOLD = ConfigOptions.key("sort-spill-threshold").intType().noDefaultValue().withDescription("If the maximum number of sort readers exceeds this value, a spill will be attempted. This prevents too many readers from consuming too much memory and causing OOM.");
    public static final ConfigOption<MemorySize> SORT_SPILL_BUFFER_SIZE = ConfigOptions.key("sort-spill-buffer-size").memoryType().defaultValue(MemorySize.parse("64 mb")).withDescription("Amount of data to spill records to disk in spilled sort.");
    public static final ConfigOption<String> SPILL_COMPRESSION = ConfigOptions.key("spill-compression").stringType().defaultValue("LZ4").withDescription("Compression for spill, currently lz4, lzo and zstd are supported.");
    public static final ConfigOption<Boolean> WRITE_ONLY = ConfigOptions.key("write-only").booleanType().defaultValue(false).withDeprecatedKeys("write.compaction-skip").withDescription("If set to true, compactions and snapshot expiration will be skipped. This option is used along with dedicated compact jobs.");
    public static final ConfigOption<MemorySize> SOURCE_SPLIT_TARGET_SIZE = ConfigOptions.key("source.split.target-size").memoryType().defaultValue(MemorySize.ofMebiBytes(128L)).withDescription("Target size of a source split when scanning a bucket.");
    public static final ConfigOption<MemorySize> SOURCE_SPLIT_OPEN_FILE_COST = ConfigOptions.key("source.split.open-file-cost").memoryType().defaultValue(MemorySize.ofMebiBytes(4L)).withDescription("Open file cost of a source file. It is used to avoid reading too many files with a source split, which can be very slow.");
    public static final ConfigOption<MemorySize> WRITE_BUFFER_SIZE = ConfigOptions.key("write-buffer-size").memoryType().defaultValue(MemorySize.parse("256 mb")).withDescription("Amount of data to build up in memory before converting to a sorted on-disk file.");
    @Documentation.OverrideDefault(value="infinite")
    public static final ConfigOption<MemorySize> WRITE_BUFFER_MAX_DISK_SIZE = ConfigOptions.key("write-buffer-spill.max-disk-size").memoryType().defaultValue(MemorySize.MAX_VALUE).withDescription("The max disk to use for write buffer spill. This only work when the write buffer spill is enabled");
    public static final ConfigOption<Boolean> WRITE_BUFFER_SPILLABLE = ConfigOptions.key("write-buffer-spillable").booleanType().noDefaultValue().withDescription("Whether the write buffer can be spillable. Enabled by default when using object storage.");
    public static final ConfigOption<Boolean> WRITE_BUFFER_FOR_APPEND = ConfigOptions.key("write-buffer-for-append").booleanType().defaultValue(false).withDescription("This option only works for append-only table. Whether the write use write buffer to avoid out-of-memory error.");
    public static final ConfigOption<Integer> WRITE_MAX_WRITERS_TO_SPILL = ConfigOptions.key("write-max-writers-to-spill").intType().defaultValue(5).withDescription("When in batch append inserting, if the writer number is greater than this option, we open the buffer cache and spill function to avoid out-of-memory. ");
    public static final ConfigOption<MemorySize> WRITE_MANIFEST_CACHE = ConfigOptions.key("write-manifest-cache").memoryType().defaultValue(MemorySize.ofMebiBytes(0L)).withDescription("Cache size for reading manifest files for write initialization.");
    public static final ConfigOption<Integer> LOCAL_SORT_MAX_NUM_FILE_HANDLES = ConfigOptions.key("local-sort.max-num-file-handles").intType().defaultValue(128).withDescription("The maximal fan-in for external merge sort. It limits the number of file handles. If it is too small, may cause intermediate merging. But if it is too large, it will cause too many files opened at the same time, consume memory and lead to random reading.");
    public static final ConfigOption<MemorySize> PAGE_SIZE = ConfigOptions.key("page-size").memoryType().defaultValue(MemorySize.parse("64 kb")).withDescription("Memory page size.");
    public static final ConfigOption<MemorySize> CACHE_PAGE_SIZE = ConfigOptions.key("cache-page-size").memoryType().defaultValue(MemorySize.parse("64 kb")).withDescription("Memory page size for caching.");
    public static final ConfigOption<MemorySize> TARGET_FILE_SIZE = ConfigOptions.key("target-file-size").memoryType().defaultValue(MemorySize.ofMebiBytes(128L)).withDescription("Target size of a file.");
    public static final ConfigOption<Integer> NUM_SORTED_RUNS_COMPACTION_TRIGGER = ConfigOptions.key("num-sorted-run.compaction-trigger").intType().defaultValue(5).withDescription("The sorted run number to trigger compaction. Includes level0 files (one file one sorted run) and high-level runs (one level one sorted run).");
    public static final ConfigOption<Integer> NUM_SORTED_RUNS_STOP_TRIGGER = ConfigOptions.key("num-sorted-run.stop-trigger").intType().noDefaultValue().withDescription("The number of sorted runs that trigger the stopping of writes, the default value is 'num-sorted-run.compaction-trigger' + 3.");
    public static final ConfigOption<Integer> NUM_LEVELS = ConfigOptions.key("num-levels").intType().noDefaultValue().withDescription("Total level number, for example, there are 3 levels, including 0,1,2 levels.");
    public static final ConfigOption<Boolean> COMMIT_FORCE_COMPACT = ConfigOptions.key("commit.force-compact").booleanType().defaultValue(false).withDescription("Whether to force a compaction before commit.");
    public static final ConfigOption<Integer> COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT = ConfigOptions.key("compaction.max-size-amplification-percent").intType().defaultValue(200).withDescription("The size amplification is defined as the amount (in percentage) of additional storage needed to store a single byte of data in the merge tree for changelog mode table.");
    public static final ConfigOption<Integer> COMPACTION_SIZE_RATIO = ConfigOptions.key("compaction.size-ratio").intType().defaultValue(1).withDescription("Percentage flexibility while comparing sorted run size for changelog mode table. If the candidate sorted run(s) size is 1% smaller than the next sorted run's size, then include next sorted run into this candidate set.");
    public static final ConfigOption<Duration> COMPACTION_OPTIMIZATION_INTERVAL = ConfigOptions.key("compaction.optimization-interval").durationType().noDefaultValue().withDescription("Implying how often to perform an optimization compaction, this configuration is used to ensure the query timeliness of the read-optimized system table.");
    public static final ConfigOption<Integer> COMPACTION_MIN_FILE_NUM = ConfigOptions.key("compaction.min.file-num").intType().defaultValue(5).withDescription("For file set [f_0,...,f_N], the minimum file number which satisfies sum(size(f_i)) >= targetFileSize to trigger a compaction for append-only table. This value avoids almost-full-file to be compacted, which is not cost-effective.");
    public static final ConfigOption<Integer> COMPACTION_MAX_FILE_NUM = ConfigOptions.key("compaction.max.file-num").intType().defaultValue(50).withDeprecatedKeys("compaction.early-max.file-num").withDescription("For file set [f_0,...,f_N], the maximum file number to trigger a compaction for append-only table, even if sum(size(f_i)) < targetFileSize. This value avoids pending too much small files, which slows down the performance.");
    public static final ConfigOption<ChangelogProducer> CHANGELOG_PRODUCER = ConfigOptions.key("changelog-producer").enumType(ChangelogProducer.class).defaultValue(ChangelogProducer.NONE).withDescription("Whether to double write to a changelog file. This changelog file keeps the details of data changes, it can be read directly during stream reads. This can be applied to tables with primary keys. ");
    public static final ConfigOption<Boolean> CHANGELOG_PRODUCER_ROW_DEDUPLICATE = ConfigOptions.key("changelog-producer.row-deduplicate").booleanType().defaultValue(false).withDescription("Whether to generate -U, +U changelog for the same record. This configuration is only valid for the changelog-producer is lookup or full-compaction.");
    @Documentation.Immutable
    public static final ConfigOption<String> SEQUENCE_FIELD = ConfigOptions.key("sequence.field").stringType().noDefaultValue().withDescription("The field that generates the sequence number for primary key table, the sequence number determines which data is the most recent.");
    @Documentation.Immutable
    public static final ConfigOption<String> ROWKIND_FIELD = ConfigOptions.key("rowkind.field").stringType().noDefaultValue().withDescription("The field that generates the row kind for primary key table, the row kind determines which data is '+I', '-U', '+U' or '-D'.");
    public static final ConfigOption<StartupMode> SCAN_MODE = ConfigOptions.key("scan.mode").enumType(StartupMode.class).defaultValue(StartupMode.DEFAULT).withDeprecatedKeys("log.scan").withDescription("Specify the scanning behavior of the source.");
    public static final ConfigOption<Long> SCAN_TIMESTAMP_MILLIS = ConfigOptions.key("scan.timestamp-millis").longType().noDefaultValue().withDeprecatedKeys("log.scan.timestamp-millis").withDescription("Optional timestamp used in case of \"from-timestamp\" scan mode. If there is no snapshot earlier than this time, the earliest snapshot will be chosen.");
    public static final ConfigOption<Long> SCAN_WATERMARK = ConfigOptions.key("scan.watermark").longType().noDefaultValue().withDescription("Optional watermark used in case of \"from-snapshot\" scan mode. If there is no snapshot later than this watermark, will throw an exceptions.");
    public static final ConfigOption<Long> SCAN_FILE_CREATION_TIME_MILLIS = ConfigOptions.key("scan.file-creation-time-millis").longType().noDefaultValue().withDescription("After configuring this time, only the data files created after this time will be read. It is independent of snapshots, but it is imprecise filtering (depending on whether or not compaction occurs).");
    public static final ConfigOption<Long> SCAN_SNAPSHOT_ID = ConfigOptions.key("scan.snapshot-id").longType().noDefaultValue().withDescription("Optional snapshot id used in case of \"from-snapshot\" or \"from-snapshot-full\" scan mode");
    public static final ConfigOption<String> SCAN_TAG_NAME = ConfigOptions.key("scan.tag-name").stringType().noDefaultValue().withDescription("Optional tag name used in case of \"from-snapshot\" scan mode.");
    @Documentation.ExcludeFromDocumentation(value="Internal use only")
    public static final ConfigOption<String> SCAN_VERSION = ConfigOptions.key("scan.version").stringType().noDefaultValue().withDescription("Specify the time travel version string used in 'VERSION AS OF' syntax. We will use tag when both tag and snapshot of that version exist.");
    public static final ConfigOption<Long> SCAN_BOUNDED_WATERMARK = ConfigOptions.key("scan.bounded.watermark").longType().noDefaultValue().withDescription("End condition \"watermark\" for bounded streaming mode. Stream reading will end when a larger watermark snapshot is encountered.");
    public static final ConfigOption<Integer> SCAN_MANIFEST_PARALLELISM = ConfigOptions.key("scan.manifest.parallelism").intType().noDefaultValue().withDescription("The parallelism of scanning manifest files, default value is the size of cpu processor. Note: Scale-up this parameter will increase memory usage while scanning manifest files. We can consider downsize it when we encounter an out of memory exception while scanning");
    @Documentation.ExcludeFromDocumentation(value="Confused without log system")
    public static final ConfigOption<LogConsistency> LOG_CONSISTENCY = ConfigOptions.key("log.consistency").enumType(LogConsistency.class).defaultValue(LogConsistency.TRANSACTIONAL).withDescription("Specify the log consistency mode for table.");
    @Documentation.ExcludeFromDocumentation(value="Confused without log system")
    public static final ConfigOption<LogChangelogMode> LOG_CHANGELOG_MODE = ConfigOptions.key("log.changelog-mode").enumType(LogChangelogMode.class).defaultValue(LogChangelogMode.AUTO).withDescription("Specify the log changelog mode for table.");
    @Documentation.ExcludeFromDocumentation(value="Confused without log system")
    public static final ConfigOption<String> LOG_KEY_FORMAT = ConfigOptions.key("log.key.format").stringType().defaultValue("json").withDescription("Specify the key message format of log system with primary key.");
    @Documentation.ExcludeFromDocumentation(value="Confused without log system")
    public static final ConfigOption<String> LOG_FORMAT = ConfigOptions.key("log.format").stringType().defaultValue("debezium-json").withDescription("Specify the message format of log system.");
    public static final ConfigOption<Boolean> AUTO_CREATE = ConfigOptions.key("auto-create").booleanType().defaultValue(false).withDescription("Whether to create underlying storage when reading and writing the table.");
    public static final ConfigOption<Boolean> STREAMING_READ_OVERWRITE = ConfigOptions.key("streaming-read-overwrite").booleanType().defaultValue(false).withDescription("Whether to read the changes from overwrite in streaming mode. Cannot be set to true when changelog producer is full-compaction or lookup because it will read duplicated changes.");
    public static final ConfigOption<Boolean> DYNAMIC_PARTITION_OVERWRITE = ConfigOptions.key("dynamic-partition-overwrite").booleanType().defaultValue(true).withDescription("Whether only overwrite dynamic partition when overwriting a partitioned table with dynamic partition columns. Works only when the table has partition keys.");
    public static final ConfigOption<Duration> PARTITION_EXPIRATION_TIME = ConfigOptions.key("partition.expiration-time").durationType().noDefaultValue().withDescription("The expiration interval of a partition. A partition will be expired if it\u2018s lifetime is over this value. Partition time is extracted from the partition value.");
    public static final ConfigOption<Duration> PARTITION_EXPIRATION_CHECK_INTERVAL = ConfigOptions.key("partition.expiration-check-interval").durationType().defaultValue(Duration.ofHours(1L)).withDescription("The check interval of partition expiration.");
    public static final ConfigOption<String> PARTITION_TIMESTAMP_FORMATTER = ConfigOptions.key("partition.timestamp-formatter").stringType().noDefaultValue().withDescription(Description.builder().text("The formatter to format timestamp from string. It can be used with 'partition.timestamp-pattern' to create a formatter using the specified value.").list(TextElement.text("Default formatter is 'yyyy-MM-dd HH:mm:ss' and 'yyyy-MM-dd'."), TextElement.text("Supports multiple partition fields like '$year-$month-$day $hour:00:00'."), TextElement.text("The timestamp-formatter is compatible with Java's DateTimeFormatter.")).build());
    public static final ConfigOption<String> PARTITION_TIMESTAMP_PATTERN = ConfigOptions.key("partition.timestamp-pattern").stringType().noDefaultValue().withDescription(Description.builder().text("You can specify a pattern to get a timestamp from partitions. The formatter pattern is defined by 'partition.timestamp-formatter'.").list(TextElement.text("By default, read from the first field."), TextElement.text("If the timestamp in the partition is a single field called 'dt', you can use '$dt'."), TextElement.text("If it is spread across multiple fields for year, month, day, and hour, you can use '$year-$month-$day $hour:00:00'."), TextElement.text("If the timestamp is in fields dt and hour, you can use '$dt $hour:00:00'.")).build());
    public static final ConfigOption<Boolean> SCAN_PLAN_SORT_PARTITION = ConfigOptions.key("scan.plan-sort-partition").booleanType().defaultValue(false).withDescription(Description.builder().text("Whether to sort plan files by partition fields, this allows you to read according to the partition order, even if your partition writes are out of order.").linebreak().text("It is recommended that you use this for streaming read of the 'append-only' table. By default, streaming read will read the full snapshot first. In order to avoid the disorder reading for partitions, you can open this option.").build());
    @Documentation.Immutable
    public static final ConfigOption<String> PRIMARY_KEY = ConfigOptions.key("primary-key").stringType().noDefaultValue().withDescription("Define primary key by table options, cannot define primary key on DDL and table options at the same time.");
    @Documentation.Immutable
    public static final ConfigOption<String> PARTITION = ConfigOptions.key("partition").stringType().noDefaultValue().withDescription("Define partition by table options, cannot define partition on DDL and table options at the same time.");
    public static final ConfigOption<Float> LOOKUP_HASH_LOAD_FACTOR = ConfigOptions.key("lookup.hash-load-factor").floatType().defaultValue(Float.valueOf(0.75f)).withDescription("The index load factor for lookup.");
    public static final ConfigOption<Duration> LOOKUP_CACHE_FILE_RETENTION = ConfigOptions.key("lookup.cache-file-retention").durationType().defaultValue(Duration.ofHours(1L)).withDescription("The cached files retention time for lookup. After the file expires, if there is a need for access, it will be re-read from the DFS to build an index on the local disk.");
    @Documentation.OverrideDefault(value="infinite")
    public static final ConfigOption<MemorySize> LOOKUP_CACHE_MAX_DISK_SIZE = ConfigOptions.key("lookup.cache-max-disk-size").memoryType().defaultValue(MemorySize.MAX_VALUE).withDescription("Max disk size for lookup cache, you can use this option to limit the use of local disks.");
    public static final ConfigOption<String> LOOKUP_CACHE_SPILL_COMPRESSION = ConfigOptions.key("lookup.cache-spill-compression").stringType().defaultValue("lz4").withDescription("Spill compression for lookup cache, currently none, lz4, lzo and zstd are supported.");
    public static final ConfigOption<MemorySize> LOOKUP_CACHE_MAX_MEMORY_SIZE = ConfigOptions.key("lookup.cache-max-memory-size").memoryType().defaultValue(MemorySize.parse("256 mb")).withDescription("Max memory size for lookup cache.");
    public static final ConfigOption<Boolean> LOOKUP_CACHE_BLOOM_FILTER_ENABLED = ConfigOptions.key("lookup.cache.bloom.filter.enabled").booleanType().defaultValue(true).withDescription("Whether to enable the bloom filter for lookup cache.");
    public static final ConfigOption<Double> LOOKUP_CACHE_BLOOM_FILTER_FPP = ConfigOptions.key("lookup.cache.bloom.filter.fpp").doubleType().defaultValue(0.05).withDescription("Define the default false positive probability for lookup cache bloom filters.");
    public static final ConfigOption<Integer> READ_BATCH_SIZE = ConfigOptions.key("read.batch-size").intType().defaultValue(1024).withDescription("Read batch size for orc and parquet.");
    public static final ConfigOption<String> CONSUMER_ID = ConfigOptions.key("consumer-id").stringType().noDefaultValue().withDescription("Consumer id for recording the offset of consumption in the storage.");
    public static final ConfigOption<Integer> FULL_COMPACTION_DELTA_COMMITS = ConfigOptions.key("full-compaction.delta-commits").intType().noDefaultValue().withDescription("Full compaction will be constantly triggered after delta commits.");
    @Documentation.ExcludeFromDocumentation(value="Internal use only")
    public static final ConfigOption<StreamScanMode> STREAM_SCAN_MODE = ConfigOptions.key("stream-scan-mode").enumType(StreamScanMode.class).defaultValue(StreamScanMode.NONE).withDescription("Only used to force TableScan to construct suitable 'StartingUpScanner' and 'FollowUpScanner' dedicated internal streaming scan.");
    public static final ConfigOption<StreamingReadMode> STREAMING_READ_MODE = ConfigOptions.key("streaming-read-mode").enumType(StreamingReadMode.class).noDefaultValue().withDescription(Description.builder().text("The mode of streaming read that specifies to read the data of table file or log").linebreak().linebreak().text("Possible values:").linebreak().list(TextElement.text(StreamingReadMode.FILE.getValue() + ": Reads from the data of table file store.")).list(TextElement.text(StreamingReadMode.LOG.getValue() + ": Read from the data of table log store.")).build());
    public static final ConfigOption<Duration> CONSUMER_EXPIRATION_TIME = ConfigOptions.key("consumer.expiration-time").durationType().noDefaultValue().withDescription("The expiration interval of consumer files. A consumer file will be expired if it's lifetime after last modification is over this value.");
    public static final ConfigOption<ConsumerMode> CONSUMER_CONSISTENCY_MODE = ConfigOptions.key("consumer.mode").enumType(ConsumerMode.class).defaultValue(ConsumerMode.EXACTLY_ONCE).withDescription("Specify the consumer consistency mode for table.");
    public static final ConfigOption<Boolean> CONSUMER_IGNORE_PROGRESS = ConfigOptions.key("consumer.ignore-progress").booleanType().defaultValue(false).withDescription("Whether to ignore consumer progress for the newly started job.");
    public static final ConfigOption<Long> DYNAMIC_BUCKET_TARGET_ROW_NUM = ConfigOptions.key("dynamic-bucket.target-row-num").longType().defaultValue(2000000L).withDescription("If the bucket is -1, for primary key table, is dynamic bucket mode, this option controls the target row number for one bucket.");
    public static final ConfigOption<Integer> DYNAMIC_BUCKET_INITIAL_BUCKETS = ConfigOptions.key("dynamic-bucket.initial-buckets").intType().noDefaultValue().withDescription("Initial buckets for a partition in assigner operator for dynamic bucket mode.");
    public static final ConfigOption<Integer> DYNAMIC_BUCKET_ASSIGNER_PARALLELISM = ConfigOptions.key("dynamic-bucket.assigner-parallelism").intType().noDefaultValue().withDescription("Parallelism of assigner operator for dynamic bucket mode, it is related to the number of initialized bucket, too small will lead to insufficient processing speed of assigner.");
    public static final ConfigOption<String> INCREMENTAL_BETWEEN = ConfigOptions.key("incremental-between").stringType().noDefaultValue().withDescription("Read incremental changes between start snapshot (exclusive) and end snapshot, for example, '5,10' means changes between snapshot 5 and snapshot 10.");
    public static final ConfigOption<IncrementalBetweenScanMode> INCREMENTAL_BETWEEN_SCAN_MODE = ConfigOptions.key("incremental-between-scan-mode").enumType(IncrementalBetweenScanMode.class).defaultValue(IncrementalBetweenScanMode.AUTO).withDescription("Scan kind when Read incremental changes between start snapshot (exclusive) and end snapshot. ");
    public static final ConfigOption<String> INCREMENTAL_BETWEEN_TIMESTAMP = ConfigOptions.key("incremental-between-timestamp").stringType().noDefaultValue().withDescription("Read incremental changes between start timestamp (exclusive) and end timestamp, for example, 't1,t2' means changes between timestamp t1 and timestamp t2.");
    public static final String STATS_MODE_SUFFIX = "stats-mode";
    public static final ConfigOption<String> METADATA_STATS_MODE = ConfigOptions.key("metadata.stats-mode").stringType().defaultValue("truncate(16)").withDescription(Description.builder().text("The mode of metadata stats collection. none, counts, truncate(16), full is available.").linebreak().list(TextElement.text("\"none\": means disable the metadata stats collection.")).list(TextElement.text("\"counts\" means only collect the null count.")).list(TextElement.text("\"full\": means collect the null count, min/max value.")).list(TextElement.text("\"truncate(16)\": means collect the null count, min/max value with truncated length of 16.")).list(TextElement.text("Field level stats mode can be specified by fields.{field_name}.stats-mode")).build());
    public static final ConfigOption<String> COMMIT_CALLBACKS = ConfigOptions.key("commit.callbacks").stringType().defaultValue("").withDescription("A list of commit callback classes to be called after a successful commit. Class names are connected with comma (example: com.test.CallbackA,com.sample.CallbackB).");
    public static final ConfigOption<String> COMMIT_CALLBACK_PARAM = ConfigOptions.key("commit.callback.#.param").stringType().noDefaultValue().withDescription("Parameter string for the constructor of class #. Callback class should parse the parameter by itself.");
    public static final ConfigOption<String> TAG_CALLBACKS = ConfigOptions.key("tag.callbacks").stringType().defaultValue("").withDescription("A list of commit callback classes to be called after a successful tag. Class names are connected with comma (example: com.test.CallbackA,com.sample.CallbackB).");
    public static final ConfigOption<String> TAG_CALLBACK_PARAM = ConfigOptions.key("tag.callback.#.param").stringType().noDefaultValue().withDescription("Parameter string for the constructor of class #. Callback class should parse the parameter by itself.");
    public static final ConfigOption<Boolean> METASTORE_PARTITIONED_TABLE = ConfigOptions.key("metastore.partitioned-table").booleanType().defaultValue(false).withDescription("Whether to create this table as a partitioned table in metastore.\nFor example, if you want to list all partitions of a Paimon table in Hive, you need to create this table as a partitioned table in Hive metastore.\nThis config option does not affect the default filesystem metastore.");
    public static final ConfigOption<String> METASTORE_TAG_TO_PARTITION = ConfigOptions.key("metastore.tag-to-partition").stringType().noDefaultValue().withDescription("Whether to create this table as a partitioned table for mapping non-partitioned table tags in metastore. This allows the Hive engine to view this table in a partitioned table view and use partitioning field to read specific partitions (specific tags).");
    public static final ConfigOption<TagCreationMode> METASTORE_TAG_TO_PARTITION_PREVIEW = ConfigOptions.key("metastore.tag-to-partition.preview").enumType(TagCreationMode.class).defaultValue(TagCreationMode.NONE).withDescription("Whether to preview tag of generated snapshots in metastore. This allows the Hive engine to query specific tag before creation.");
    public static final ConfigOption<TagCreationMode> TAG_AUTOMATIC_CREATION = ConfigOptions.key("tag.automatic-creation").enumType(TagCreationMode.class).defaultValue(TagCreationMode.NONE).withDescription("Whether to create tag automatically. And how to generate tags.");
    public static final ConfigOption<TagCreationPeriod> TAG_CREATION_PERIOD = ConfigOptions.key("tag.creation-period").enumType(TagCreationPeriod.class).defaultValue(TagCreationPeriod.DAILY).withDescription("What frequency is used to generate tags.");
    public static final ConfigOption<Duration> TAG_CREATION_DELAY = ConfigOptions.key("tag.creation-delay").durationType().defaultValue(Duration.ofMillis(0L)).withDescription("How long is the delay after the period ends before creating a tag. This can allow some late data to enter the Tag.");
    public static final ConfigOption<TagPeriodFormatter> TAG_PERIOD_FORMATTER = ConfigOptions.key("tag.period-formatter").enumType(TagPeriodFormatter.class).defaultValue(TagPeriodFormatter.WITH_DASHES).withDescription("The date format for tag periods.");
    public static final ConfigOption<Integer> TAG_NUM_RETAINED_MAX = ConfigOptions.key("tag.num-retained-max").intType().noDefaultValue().withDescription("The maximum number of tags to retain. It only affects auto-created tags.");
    public static final ConfigOption<Duration> TAG_DEFAULT_TIME_RETAINED = ConfigOptions.key("tag.default-time-retained").durationType().noDefaultValue().withDescription("The default maximum time retained for newly created tags. It affects both auto-created tags and manually created (by procedure) tags.");
    public static final ConfigOption<Duration> SNAPSHOT_WATERMARK_IDLE_TIMEOUT = ConfigOptions.key("snapshot.watermark-idle-timeout").durationType().noDefaultValue().withDescription("In watermarking, if a source remains idle beyond the specified timeout duration, it triggers snapshot advancement and facilitates tag creation.");
    public static final ConfigOption<Integer> PARQUET_ENABLE_DICTIONARY = ConfigOptions.key("parquet.enable.dictionary").intType().noDefaultValue().withDescription("Turn off the dictionary encoding for all fields in parquet.");
    public static final ConfigOption<String> SINK_WATERMARK_TIME_ZONE = ConfigOptions.key("sink.watermark-time-zone").stringType().defaultValue("UTC").withDescription("The time zone to parse the long watermark value to TIMESTAMP value. The default value is 'UTC', which means the watermark is defined on TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is user configured time zone, the value should be the user configured local time zone. The option value is either a full name such as 'America/Los_Angeles', or a custom timezone id such as 'GMT-08:00'.");
    public static final ConfigOption<MemorySize> LOCAL_MERGE_BUFFER_SIZE = ConfigOptions.key("local-merge-buffer-size").memoryType().noDefaultValue().withDescription("Local merge will buffer and merge input records before they're shuffled by bucket and written into sink. The buffer will be flushed when it is full.\nMainly to resolve data skew on primary keys. We recommend starting with 64 mb when trying out this feature.");
    public static final ConfigOption<Duration> CROSS_PARTITION_UPSERT_INDEX_TTL = ConfigOptions.key("cross-partition-upsert.index-ttl").durationType().noDefaultValue().withDescription("The TTL in rocksdb index for cross partition upsert (primary keys not contain all partition fields), this can avoid maintaining too many indexes and lead to worse and worse performance, but please note that this may also cause data duplication.");
    public static final ConfigOption<Integer> CROSS_PARTITION_UPSERT_BOOTSTRAP_PARALLELISM = ConfigOptions.key("cross-partition-upsert.bootstrap-parallelism").intType().defaultValue(10).withDescription("The parallelism for bootstrap in a single task for cross partition upsert.");
    public static final ConfigOption<Integer> ZORDER_VAR_LENGTH_CONTRIBUTION = ConfigOptions.key("zorder.var-length-contribution").intType().defaultValue(8).withDescription("The bytes of types (CHAR, VARCHAR, BINARY, VARBINARY) devote to the zorder sort.");
    public static final ConfigOption<MemorySize> FILE_READER_ASYNC_THRESHOLD = ConfigOptions.key("file-reader-async-threshold").memoryType().defaultValue(MemorySize.ofMebiBytes(10L)).withDescription("The threshold for read file async.");
    public static final ConfigOption<Boolean> COMMIT_FORCE_CREATE_SNAPSHOT = ConfigOptions.key("commit.force-create-snapshot").booleanType().defaultValue(false).withDescription("Whether to force create snapshot on commit.");
    public static final ConfigOption<Boolean> DELETION_VECTORS_ENABLED = ConfigOptions.key("deletion-vectors.enabled").booleanType().defaultValue(false).withDescription("Whether to enable deletion vectors mode. In this mode, index files containing deletion vectors are generated when data is written, which marks the data for deletion. During read operations, by applying these index files, merging can be avoided.");
    public static final ConfigOption<Boolean> DELETION_FORCE_PRODUCE_CHANGELOG = ConfigOptions.key("delete.force-produce-changelog").booleanType().defaultValue(false).withDescription("Force produce changelog in delete sql, or you can use 'streaming-read-overwrite' to read changelog from overwrite commit.");
    public static final ConfigOption<RangeStrategy> SORT_RANG_STRATEGY = ConfigOptions.key("sort-compaction.range-strategy").enumType(RangeStrategy.class).defaultValue(RangeStrategy.QUANTITY).withDescription("The range strategy of sort compaction, the default value is quantity.\nIf the data size allocated for the sorting task is uneven,which may lead to performance bottlenecks, the config can be set to size.");
    public static final ConfigOption<Integer> SORT_COMPACTION_SAMPLE_MAGNIFICATION = ConfigOptions.key("sort-compaction.local-sample.magnification").intType().defaultValue(1000).withDescription("The magnification of local sample for sort-compaction.The size of local sample is sink parallelism * magnification.");
    public static final ConfigOption<Duration> RECORD_LEVEL_EXPIRE_TIME = ConfigOptions.key("record-level.expire-time").durationType().noDefaultValue().withDescription("Record level expire time for primary key table, expiration happens in compaction, there is no strong guarantee to expire records in time. You must specific 'record-level.time-field' too.");
    public static final ConfigOption<String> RECORD_LEVEL_TIME_FIELD = ConfigOptions.key("record-level.time-field").stringType().noDefaultValue().withDescription("Time field for record level expire, it should be a seconds INT.");
    private final Options options;

    public CoreOptions(Map<String, String> options) {
        this(Options.fromMap(options));
    }

    public CoreOptions(Options options) {
        this.options = options;
    }

    public static CoreOptions fromMap(Map<String, String> options) {
        return new CoreOptions(options);
    }

    public Options toConfiguration() {
        return this.options;
    }

    public Map<String, String> toMap() {
        return this.options.toMap();
    }

    public int bucket() {
        return this.options.get(BUCKET);
    }

    public Path path() {
        return CoreOptions.path(this.options.toMap());
    }

    public static Path path(Map<String, String> options) {
        return new Path(options.get(PATH.key()));
    }

    public static Path path(Options options) {
        return new Path(options.get(PATH));
    }

    public FileFormatType formatType() {
        return this.options.get(FILE_FORMAT);
    }

    public FileFormat fileFormat() {
        return CoreOptions.createFileFormat(this.options, FILE_FORMAT);
    }

    public FileFormat manifestFormat() {
        return CoreOptions.createFileFormat(this.options, MANIFEST_FORMAT);
    }

    public MemorySize manifestTargetSize() {
        return this.options.get(MANIFEST_TARGET_FILE_SIZE);
    }

    public MemorySize manifestFullCompactionThresholdSize() {
        return this.options.get(MANIFEST_FULL_COMPACTION_FILE_SIZE);
    }

    public MemorySize writeManifestCache() {
        return this.options.get(WRITE_MANIFEST_CACHE);
    }

    public String partitionDefaultName() {
        return this.options.get(PARTITION_DEFAULT_NAME);
    }

    public boolean sortBySize() {
        return this.options.get(SORT_RANG_STRATEGY) == RangeStrategy.SIZE;
    }

    public Integer getLocalSampleMagnification() {
        return this.options.get(SORT_COMPACTION_SAMPLE_MAGNIFICATION);
    }

    public static FileFormat createFileFormat(Options options, ConfigOption<FileFormatType> formatOption) {
        String formatIdentifier = options.get(formatOption).toString();
        return FileFormat.getFileFormat(options, formatIdentifier);
    }

    public Map<Integer, String> fileCompressionPerLevel() {
        Map<String, String> levelCompressions = this.options.get(FILE_COMPRESSION_PER_LEVEL);
        return levelCompressions.entrySet().stream().collect(Collectors.toMap(e -> Integer.valueOf((String)e.getKey()), Map.Entry::getValue));
    }

    public Map<Integer, String> fileFormatPerLevel() {
        Map<String, String> levelFormats = this.options.get(FILE_FORMAT_PER_LEVEL);
        return levelFormats.entrySet().stream().collect(Collectors.toMap(e -> Integer.valueOf((String)e.getKey()), Map.Entry::getValue));
    }

    public boolean definedAggFunc() {
        for (String key : this.options.toMap().keySet()) {
            if (!key.startsWith(FIELDS_PREFIX) || !key.endsWith(AGG_FUNCTION)) continue;
            return true;
        }
        return false;
    }

    public String fieldAggFunc(String fieldName) {
        return this.options.get(ConfigOptions.key("fields." + fieldName + "." + AGG_FUNCTION).stringType().noDefaultValue());
    }

    public boolean fieldAggIgnoreRetract(String fieldName) {
        return this.options.get(ConfigOptions.key("fields." + fieldName + "." + IGNORE_RETRACT).booleanType().defaultValue(false));
    }

    public List<String> fieldNestedUpdateAggNestedKey(String fieldName) {
        String keyString = this.options.get(ConfigOptions.key("fields." + fieldName + "." + NESTED_KEY).stringType().noDefaultValue());
        if (keyString == null) {
            return Collections.emptyList();
        }
        return Arrays.asList(keyString.split(","));
    }

    public boolean fieldCollectAggDistinct(String fieldName) {
        return this.options.get(ConfigOptions.key("fields." + fieldName + "." + DISTINCT).booleanType().defaultValue(false));
    }

    public String fileCompression() {
        return this.options.get(FILE_COMPRESSION);
    }

    public MemorySize fileReaderAsyncThreshold() {
        return this.options.get(FILE_READER_ASYNC_THRESHOLD);
    }

    public int snapshotNumRetainMin() {
        return this.options.get(SNAPSHOT_NUM_RETAINED_MIN);
    }

    public int snapshotNumRetainMax() {
        return this.options.get(SNAPSHOT_NUM_RETAINED_MAX);
    }

    public Duration snapshotTimeRetain() {
        return this.options.get(SNAPSHOT_TIME_RETAINED);
    }

    public int changelogNumRetainMin() {
        return this.options.getOptional(CHANGELOG_NUM_RETAINED_MIN).orElse(this.options.get(SNAPSHOT_NUM_RETAINED_MIN));
    }

    public int changelogNumRetainMax() {
        return this.options.getOptional(CHANGELOG_NUM_RETAINED_MAX).orElse(this.options.get(SNAPSHOT_NUM_RETAINED_MAX));
    }

    public Duration changelogTimeRetain() {
        return this.options.getOptional(CHANGELOG_TIME_RETAINED).orElse(this.options.get(SNAPSHOT_TIME_RETAINED));
    }

    public boolean changelogLifecycleDecoupled() {
        return this.changelogNumRetainMax() > this.snapshotNumRetainMax() || this.changelogTimeRetain().compareTo(this.snapshotTimeRetain()) > 0 || this.changelogNumRetainMin() > this.snapshotNumRetainMin();
    }

    public ExpireExecutionMode snapshotExpireExecutionMode() {
        return this.options.get(SNAPSHOT_EXPIRE_EXECUTION_MODE);
    }

    public int snapshotExpireLimit() {
        return this.options.get(SNAPSHOT_EXPIRE_LIMIT);
    }

    public boolean snapshotExpireCleanEmptyDirectories() {
        return this.options.get(SNAPSHOT_EXPIRE_CLEAN_EMPTY_DIRECTORIES);
    }

    public int manifestMergeMinCount() {
        return this.options.get(MANIFEST_MERGE_MIN_COUNT);
    }

    public MergeEngine mergeEngine() {
        return this.options.get(MERGE_ENGINE);
    }

    public boolean ignoreDelete() {
        return this.options.get(IGNORE_DELETE);
    }

    public SortEngine sortEngine() {
        return this.options.get(SORT_ENGINE);
    }

    public int sortSpillThreshold() {
        Integer maxSortedRunNum = this.options.get(SORT_SPILL_THRESHOLD);
        if (maxSortedRunNum == null) {
            maxSortedRunNum = MathUtils.incrementSafely(this.numSortedRunStopTrigger());
        }
        Preconditions.checkArgument(maxSortedRunNum > 1, "The sort spill threshold cannot be smaller than 2.");
        return maxSortedRunNum;
    }

    public long splitTargetSize() {
        return this.options.get(SOURCE_SPLIT_TARGET_SIZE).getBytes();
    }

    public long splitOpenFileCost() {
        return this.options.get(SOURCE_SPLIT_OPEN_FILE_COST).getBytes();
    }

    public long writeBufferSize() {
        return this.options.get(WRITE_BUFFER_SIZE).getBytes();
    }

    public boolean writeBufferSpillable(boolean usingObjectStore, boolean isStreaming) {
        return this.options.getOptional(WRITE_BUFFER_SPILLABLE).orElse(usingObjectStore || !isStreaming);
    }

    public MemorySize writeBufferSpillDiskSize() {
        return this.options.get(WRITE_BUFFER_MAX_DISK_SIZE);
    }

    public boolean useWriteBufferForAppend() {
        return this.options.get(WRITE_BUFFER_FOR_APPEND);
    }

    public int writeMaxWritersToSpill() {
        return this.options.get(WRITE_MAX_WRITERS_TO_SPILL);
    }

    public long sortSpillBufferSize() {
        return this.options.get(SORT_SPILL_BUFFER_SIZE).getBytes();
    }

    public String spillCompression() {
        return this.options.get(SPILL_COMPRESSION);
    }

    public Duration continuousDiscoveryInterval() {
        return this.options.get(CONTINUOUS_DISCOVERY_INTERVAL);
    }

    public int scanSplitMaxPerTask() {
        return this.options.get(SCAN_MAX_SPLITS_PER_TASK);
    }

    public int localSortMaxNumFileHandles() {
        return this.options.get(LOCAL_SORT_MAX_NUM_FILE_HANDLES);
    }

    public int pageSize() {
        return (int)this.options.get(PAGE_SIZE).getBytes();
    }

    public int cachePageSize() {
        return (int)this.options.get(CACHE_PAGE_SIZE).getBytes();
    }

    public MemorySize lookupCacheMaxMemory() {
        return this.options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
    }

    public long targetFileSize() {
        return this.options.get(TARGET_FILE_SIZE).getBytes();
    }

    public long compactionFileSize() {
        return this.options.get(TARGET_FILE_SIZE).getBytes() / 10L * 7L;
    }

    public int numSortedRunCompactionTrigger() {
        return this.options.get(NUM_SORTED_RUNS_COMPACTION_TRIGGER);
    }

    @Nullable
    public Duration optimizedCompactionInterval() {
        return this.options.get(COMPACTION_OPTIMIZATION_INTERVAL);
    }

    public int numSortedRunStopTrigger() {
        Integer stopTrigger = this.options.get(NUM_SORTED_RUNS_STOP_TRIGGER);
        if (stopTrigger == null) {
            stopTrigger = MathUtils.addSafely(this.numSortedRunCompactionTrigger(), 3);
        }
        return Math.max(this.numSortedRunCompactionTrigger(), stopTrigger);
    }

    public int numLevels() {
        Integer numLevels = this.options.get(NUM_LEVELS);
        if (numLevels == null) {
            numLevels = MathUtils.incrementSafely(this.numSortedRunCompactionTrigger());
        }
        return numLevels;
    }

    public boolean commitForceCompact() {
        return this.options.get(COMMIT_FORCE_COMPACT);
    }

    public int maxSizeAmplificationPercent() {
        return this.options.get(COMPACTION_MAX_SIZE_AMPLIFICATION_PERCENT);
    }

    public int sortedRunSizeRatio() {
        return this.options.get(COMPACTION_SIZE_RATIO);
    }

    public int compactionMinFileNum() {
        return this.options.get(COMPACTION_MIN_FILE_NUM);
    }

    public int compactionMaxFileNum() {
        return this.options.get(COMPACTION_MAX_FILE_NUM);
    }

    public long dynamicBucketTargetRowNum() {
        return this.options.get(DYNAMIC_BUCKET_TARGET_ROW_NUM);
    }

    public ChangelogProducer changelogProducer() {
        return this.options.get(CHANGELOG_PRODUCER);
    }

    public boolean needLookup() {
        return this.lookupStrategy().needLookup;
    }

    public LookupStrategy lookupStrategy() {
        return LookupStrategy.from(this.mergeEngine().equals(MergeEngine.FIRST_ROW), this.changelogProducer().equals(ChangelogProducer.LOOKUP), this.deletionVectorsEnabled());
    }

    public boolean changelogRowDeduplicate() {
        return this.options.get(CHANGELOG_PRODUCER_ROW_DEDUPLICATE);
    }

    public boolean scanPlanSortPartition() {
        return this.options.get(SCAN_PLAN_SORT_PARTITION);
    }

    public StartupMode startupMode() {
        return CoreOptions.startupMode(this.options);
    }

    public static StartupMode startupMode(Options options) {
        StartupMode mode = options.get(SCAN_MODE);
        if (mode == StartupMode.DEFAULT) {
            if (options.getOptional(SCAN_TIMESTAMP_MILLIS).isPresent()) {
                return StartupMode.FROM_TIMESTAMP;
            }
            if (options.getOptional(SCAN_SNAPSHOT_ID).isPresent() || options.getOptional(SCAN_TAG_NAME).isPresent() || options.getOptional(SCAN_WATERMARK).isPresent() || options.getOptional(SCAN_VERSION).isPresent()) {
                return StartupMode.FROM_SNAPSHOT;
            }
            if (options.getOptional(SCAN_FILE_CREATION_TIME_MILLIS).isPresent()) {
                return StartupMode.FROM_FILE_CREATION_TIME;
            }
            if (options.getOptional(INCREMENTAL_BETWEEN).isPresent() || options.getOptional(INCREMENTAL_BETWEEN_TIMESTAMP).isPresent()) {
                return StartupMode.INCREMENTAL;
            }
            return StartupMode.LATEST_FULL;
        }
        if (mode == StartupMode.FULL) {
            return StartupMode.LATEST_FULL;
        }
        return mode;
    }

    public Long scanTimestampMills() {
        return this.options.get(SCAN_TIMESTAMP_MILLIS);
    }

    public Long scanWatermark() {
        return this.options.get(SCAN_WATERMARK);
    }

    public Long scanFileCreationTimeMills() {
        return this.options.get(SCAN_FILE_CREATION_TIME_MILLIS);
    }

    public Long scanBoundedWatermark() {
        return this.options.get(SCAN_BOUNDED_WATERMARK);
    }

    public Long scanSnapshotId() {
        return this.options.get(SCAN_SNAPSHOT_ID);
    }

    public String scanTagName() {
        return this.options.get(SCAN_TAG_NAME);
    }

    public String scanVersion() {
        return this.options.get(SCAN_VERSION);
    }

    public Pair<String, String> incrementalBetween() {
        String str = this.options.get(INCREMENTAL_BETWEEN);
        if (str == null && (str = this.options.get(INCREMENTAL_BETWEEN_TIMESTAMP)) == null) {
            return null;
        }
        String[] split = str.split(",");
        if (split.length != 2) {
            throw new IllegalArgumentException("The incremental-between or incremental-between-timestamp  must specific start(exclusive) and end snapshot or timestamp, for example, 'incremental-between'='5,10' means changes between snapshot 5 and snapshot 10. But is: " + str);
        }
        return Pair.of(split[0], split[1]);
    }

    public IncrementalBetweenScanMode incrementalBetweenScanMode() {
        return this.options.get(INCREMENTAL_BETWEEN_SCAN_MODE);
    }

    public Integer scanManifestParallelism() {
        return this.options.get(SCAN_MANIFEST_PARALLELISM);
    }

    public Integer dynamicBucketInitialBuckets() {
        return this.options.get(DYNAMIC_BUCKET_INITIAL_BUCKETS);
    }

    public Integer dynamicBucketAssignerParallelism() {
        return this.options.get(DYNAMIC_BUCKET_ASSIGNER_PARALLELISM);
    }

    public List<String> sequenceField() {
        return this.options.getOptional(SEQUENCE_FIELD).map(s -> Arrays.asList(s.split(","))).orElse(Collections.emptyList());
    }

    public Optional<String> rowkindField() {
        return this.options.getOptional(ROWKIND_FIELD);
    }

    public boolean writeOnly() {
        return this.options.get(WRITE_ONLY);
    }

    public boolean streamingReadOverwrite() {
        return this.options.get(STREAMING_READ_OVERWRITE);
    }

    public boolean dynamicPartitionOverwrite() {
        return this.options.get(DYNAMIC_PARTITION_OVERWRITE);
    }

    public Duration partitionExpireTime() {
        return this.options.get(PARTITION_EXPIRATION_TIME);
    }

    public Duration partitionExpireCheckInterval() {
        return this.options.get(PARTITION_EXPIRATION_CHECK_INTERVAL);
    }

    public String partitionTimestampFormatter() {
        return this.options.get(PARTITION_TIMESTAMP_FORMATTER);
    }

    public String partitionTimestampPattern() {
        return this.options.get(PARTITION_TIMESTAMP_PATTERN);
    }

    public int readBatchSize() {
        return this.options.get(READ_BATCH_SIZE);
    }

    public String consumerId() {
        return this.options.get(CONSUMER_ID);
    }

    public static StreamingReadMode streamReadType(Options options) {
        return options.get(STREAMING_READ_MODE);
    }

    public Duration consumerExpireTime() {
        return this.options.get(CONSUMER_EXPIRATION_TIME);
    }

    public boolean consumerIgnoreProgress() {
        return this.options.get(CONSUMER_IGNORE_PROGRESS);
    }

    public boolean partitionedTableInMetastore() {
        return this.options.get(METASTORE_PARTITIONED_TABLE);
    }

    @Nullable
    public String tagToPartitionField() {
        return this.options.get(METASTORE_TAG_TO_PARTITION);
    }

    public TagCreationMode tagToPartitionPreview() {
        return this.options.get(METASTORE_TAG_TO_PARTITION_PREVIEW);
    }

    public TagCreationMode tagCreationMode() {
        return this.options.get(TAG_AUTOMATIC_CREATION);
    }

    public TagCreationPeriod tagCreationPeriod() {
        return this.options.get(TAG_CREATION_PERIOD);
    }

    public Duration tagCreationDelay() {
        return this.options.get(TAG_CREATION_DELAY);
    }

    public TagPeriodFormatter tagPeriodFormatter() {
        return this.options.get(TAG_PERIOD_FORMATTER);
    }

    @Nullable
    public Integer tagNumRetainedMax() {
        return this.options.get(TAG_NUM_RETAINED_MAX);
    }

    public Duration tagDefaultTimeRetained() {
        return this.options.get(TAG_DEFAULT_TIME_RETAINED);
    }

    public Duration snapshotWatermarkIdleTimeout() {
        return this.options.get(SNAPSHOT_WATERMARK_IDLE_TIMEOUT);
    }

    public String sinkWatermarkTimeZone() {
        return this.options.get(SINK_WATERMARK_TIME_ZONE);
    }

    public boolean forceCreatingSnapshot() {
        return this.options.get(COMMIT_FORCE_CREATE_SNAPSHOT);
    }

    public Map<String, String> getFieldDefaultValues() {
        HashMap<String, String> defaultValues = new HashMap<String, String>();
        String fieldPrefix = "fields.";
        String defaultValueSuffix = ".default-value";
        for (Map.Entry<String, String> option : this.options.toMap().entrySet()) {
            String key = option.getKey();
            if (key == null || !key.startsWith(fieldPrefix) || !key.endsWith(defaultValueSuffix)) continue;
            String fieldName = key.replace(fieldPrefix, "").replace(defaultValueSuffix, "");
            defaultValues.put(fieldName, option.getValue());
        }
        return defaultValues;
    }

    public Map<String, String> commitCallbacks() {
        return this.callbacks(COMMIT_CALLBACKS, COMMIT_CALLBACK_PARAM);
    }

    public Map<String, String> tagCallbacks() {
        return this.callbacks(TAG_CALLBACKS, TAG_CALLBACK_PARAM);
    }

    private Map<String, String> callbacks(ConfigOption<String> callbacks, ConfigOption<String> callbackParam) {
        HashMap<String, String> result = new HashMap<String, String>();
        for (String className : this.options.get(callbacks).split(",")) {
            if ((className = className.trim()).length() == 0) continue;
            String param = this.options.get(callbackParam.key().replace("#", className));
            result.put(className, param);
        }
        return result;
    }

    public boolean localMergeEnabled() {
        return this.options.get(LOCAL_MERGE_BUFFER_SIZE) != null;
    }

    public long localMergeBufferSize() {
        return this.options.get(LOCAL_MERGE_BUFFER_SIZE).getBytes();
    }

    public Duration crossPartitionUpsertIndexTtl() {
        return this.options.get(CROSS_PARTITION_UPSERT_INDEX_TTL);
    }

    public int crossPartitionUpsertBootstrapParallelism() {
        return this.options.get(CROSS_PARTITION_UPSERT_BOOTSTRAP_PARALLELISM);
    }

    public int varTypeSize() {
        return this.options.get(ZORDER_VAR_LENGTH_CONTRIBUTION);
    }

    public boolean deletionVectorsEnabled() {
        return this.options.get(DELETION_VECTORS_ENABLED);
    }

    public FileIndexOptions indexColumnsOptions() {
        return new FileIndexOptions(this);
    }

    public long fileIndexInManifestThreshold() {
        return this.options.get(FILE_INDEX_IN_MANIFEST_THRESHOLD).getBytes();
    }

    public boolean fileIndexReadEnabled() {
        return this.options.get(FILE_INDEX_READ_ENABLED);
    }

    public boolean deleteForceProduceChangelog() {
        return this.options.get(DELETION_FORCE_PRODUCE_CHANGELOG);
    }

    @Nullable
    public Duration recordLevelExpireTime() {
        return this.options.get(RECORD_LEVEL_EXPIRE_TIME);
    }

    @Nullable
    public String recordLevelTimeField() {
        return this.options.get(RECORD_LEVEL_TIME_FIELD);
    }

    public static void setDefaultValues(Options options) {
        if (options.contains(SCAN_TIMESTAMP_MILLIS) && !options.contains(SCAN_MODE)) {
            options.set(SCAN_MODE, StartupMode.FROM_TIMESTAMP);
        }
        if (options.contains(SCAN_FILE_CREATION_TIME_MILLIS) && !options.contains(SCAN_MODE)) {
            options.set(SCAN_MODE, StartupMode.FROM_FILE_CREATION_TIME);
        }
        if (options.contains(SCAN_SNAPSHOT_ID) && !options.contains(SCAN_MODE)) {
            options.set(SCAN_MODE, StartupMode.FROM_SNAPSHOT);
        }
        if ((options.contains(INCREMENTAL_BETWEEN_TIMESTAMP) || options.contains(INCREMENTAL_BETWEEN)) && !options.contains(SCAN_MODE)) {
            options.set(SCAN_MODE, StartupMode.INCREMENTAL);
        }
    }

    public static List<ConfigOption<?>> getOptions() {
        Field[] fields = CoreOptions.class.getFields();
        ArrayList list = new ArrayList(fields.length);
        for (Field field : fields) {
            if (!ConfigOption.class.isAssignableFrom(field.getType())) continue;
            try {
                list.add((ConfigOption)field.get(CoreOptions.class));
            }
            catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
        }
        return list;
    }

    public static Set<String> getImmutableOptionKeys() {
        Field[] fields = CoreOptions.class.getFields();
        HashSet<String> immutableKeys = new HashSet<String>(fields.length);
        for (Field field : fields) {
            if (!ConfigOption.class.isAssignableFrom(field.getType()) || field.getAnnotation(Documentation.Immutable.class) == null) continue;
            try {
                immutableKeys.add(((ConfigOption)field.get(CoreOptions.class)).key());
            }
            catch (IllegalAccessException e) {
                throw new RuntimeException(e);
            }
        }
        return immutableKeys;
    }

    public static enum ConsumerMode implements DescribedEnum
    {
        EXACTLY_ONCE("exactly-once", "Readers consume data at snapshot granularity, and strictly ensure that the snapshot-id recorded in the consumer is the snapshot-id + 1 that all readers have exactly consumed."),
        AT_LEAST_ONCE("at-least-once", "Each reader consumes snapshots at a different rate, and the snapshot with the slowest consumption progress among all readers will be recorded in the consumer.");

        private final String value;
        private final String description;

        private ConsumerMode(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }
    }

    public static enum RangeStrategy {
        SIZE,
        QUANTITY;

    }

    public static enum ExpireExecutionMode implements DescribedEnum
    {
        SYNC("sync", "Execute expire synchronously. If there are too many files, it may take a long time and block stream processing."),
        ASYNC("async", "Execute expire asynchronously. If the generation of snapshots is greater than the deletion, there will be a backlog of files.");

        private final String value;
        private final String description;

        private ExpireExecutionMode(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }
    }

    public static enum TagCreationPeriod implements DescribedEnum
    {
        DAILY("daily", "Generate a tag every day."),
        HOURLY("hourly", "Generate a tag every hour."),
        TWO_HOURS("two-hours", "Generate a tag every two hours.");

        private final String value;
        private final String description;

        private TagCreationPeriod(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }
    }

    public static enum TagPeriodFormatter implements DescribedEnum
    {
        WITH_DASHES("with_dashes", "Dates and hours with dashes, e.g., 'yyyy-MM-dd HH'"),
        WITHOUT_DASHES("without_dashes", "Dates and hours without dashes, e.g., 'yyyyMMdd HH'");

        private final String value;
        private final String description;

        private TagPeriodFormatter(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }
    }

    public static enum TagCreationMode implements DescribedEnum
    {
        NONE("none", "No automatically created tags."),
        PROCESS_TIME("process-time", "Based on the time of the machine, create TAG once the processing time passes period time plus delay."),
        WATERMARK("watermark", "Based on the watermark of the input, create TAG once the watermark passes period time plus delay."),
        BATCH("batch", "In the batch processing scenario, the tag corresponding to the current snapshot is generated after the task is completed.");

        private final String value;
        private final String description;

        private TagCreationMode(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }
    }

    public static enum SortEngine implements DescribedEnum
    {
        MIN_HEAP("min-heap", "Use min-heap for multiway sorting."),
        LOSER_TREE("loser-tree", "Use loser-tree for multiway sorting. Compared with heapsort, loser-tree has fewer comparisons and is more efficient.");

        private final String value;
        private final String description;

        private SortEngine(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }
    }

    public static enum IncrementalBetweenScanMode implements DescribedEnum
    {
        AUTO("auto", "Scan changelog files for the table which produces changelog files. Otherwise, scan newly changed files."),
        DELTA("delta", "Scan newly changed files between snapshots."),
        CHANGELOG("changelog", "Scan changelog files between snapshots.");

        private final String value;
        private final String description;

        private IncrementalBetweenScanMode(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }

        public String getValue() {
            return this.value;
        }

        @VisibleForTesting
        public static IncrementalBetweenScanMode fromValue(String value) {
            for (IncrementalBetweenScanMode formatType : IncrementalBetweenScanMode.values()) {
                if (!formatType.value.equals(value)) continue;
                return formatType;
            }
            throw new IllegalArgumentException(String.format("Invalid format type %s, only support [%s]", value, StringUtils.join(Arrays.stream(IncrementalBetweenScanMode.values()).iterator(), ",")));
        }
    }

    public static enum StreamScanMode implements DescribedEnum
    {
        NONE("none", "No requirement."),
        COMPACT_BUCKET_TABLE("compact-bucket-table", "Compaction for traditional bucket table."),
        COMPACT_APPEND_NO_BUCKET("compact-append-no-bucket", "Compaction for append table with bucket unaware."),
        FILE_MONITOR("file-monitor", "Monitor data file changes.");

        private final String value;
        private final String description;

        private StreamScanMode(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }

        public String getValue() {
            return this.value;
        }
    }

    public static enum StreamingReadMode implements DescribedEnum
    {
        LOG("log", "Reads from the log store."),
        FILE("file", "Reads from the file store.");

        private final String value;
        private final String description;

        private StreamingReadMode(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }

        public String getValue() {
            return this.value;
        }

        @VisibleForTesting
        public static StreamingReadMode fromValue(String value) {
            for (StreamingReadMode formatType : StreamingReadMode.values()) {
                if (!formatType.value.equals(value)) continue;
                return formatType;
            }
            throw new IllegalArgumentException(String.format("Invalid format type %s, only support [%s]", value, StringUtils.join(Arrays.stream(StreamingReadMode.values()).iterator(), ",")));
        }
    }

    public static enum FileFormatType implements DescribedEnum
    {
        ORC("orc", "ORC file format."),
        PARQUET("parquet", "Parquet file format."),
        AVRO("avro", "Avro file format.");

        private final String value;
        private final String description;

        private FileFormatType(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }

        @VisibleForTesting
        public static FileFormatType fromValue(String value) {
            for (FileFormatType formatType : FileFormatType.values()) {
                if (!formatType.value.equals(value)) continue;
                return formatType;
            }
            throw new IllegalArgumentException(String.format("Invalid format type %s, only support [%s]", value, StringUtils.join(Arrays.stream(FileFormatType.values()).iterator(), ",")));
        }
    }

    public static enum ChangelogProducer implements DescribedEnum
    {
        NONE("none", "No changelog file."),
        INPUT("input", "Double write to a changelog file when flushing memory table, the changelog is from input."),
        FULL_COMPACTION("full-compaction", "Generate changelog files with each full compaction."),
        LOOKUP("lookup", "Generate changelog files through 'lookup' before committing the data writing.");

        private final String value;
        private final String description;

        private ChangelogProducer(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }
    }

    public static enum LogChangelogMode implements DescribedEnum
    {
        AUTO("auto", "Upsert for table with primary key, all for table without primary key."),
        ALL("all", "The log system stores all changes including UPDATE_BEFORE."),
        UPSERT("upsert", "The log system does not store the UPDATE_BEFORE changes, the log consumed job will automatically add the normalized node, relying on the state to generate the required update_before.");

        private final String value;
        private final String description;

        private LogChangelogMode(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }
    }

    public static enum LogConsistency implements DescribedEnum
    {
        TRANSACTIONAL("transactional", "Only the data after the checkpoint can be seen by readers, the latency depends on checkpoint interval."),
        EVENTUAL("eventual", "Immediate data visibility, you may see some intermediate states, but eventually the right results will be produced, only works for table with primary key.");

        private final String value;
        private final String description;

        private LogConsistency(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }
    }

    public static enum StartupMode implements DescribedEnum
    {
        DEFAULT("default", "Determines actual startup mode according to other table properties. If \"scan.timestamp-millis\" is set the actual startup mode will be \"from-timestamp\", and if \"scan.snapshot-id\" or \"scan.tag-name\" is set the actual startup mode will be \"from-snapshot\". Otherwise the actual startup mode will be \"latest-full\"."),
        LATEST_FULL("latest-full", "For streaming sources, produces the latest snapshot on the table upon first startup, and continue to read the latest changes. For batch sources, just produce the latest snapshot but does not read new changes."),
        FULL("full", "Deprecated. Same as \"latest-full\"."),
        LATEST("latest", "For streaming sources, continuously reads latest changes without producing a snapshot at the beginning. For batch sources, behaves the same as the \"latest-full\" startup mode."),
        COMPACTED_FULL("compacted-full", "For streaming sources, produces a snapshot after the latest compaction on the table upon first startup, and continue to read the latest changes. For batch sources, just produce a snapshot after the latest compaction but does not read new changes. Snapshots of full compaction are picked when scheduled full-compaction is enabled."),
        FROM_TIMESTAMP("from-timestamp", "For streaming sources, continuously reads changes starting from timestamp specified by \"scan.timestamp-millis\", without producing a snapshot at the beginning. For batch sources, produces a snapshot at timestamp specified by \"scan.timestamp-millis\" but does not read new changes."),
        FROM_FILE_CREATION_TIME("from-file-creation-time", "For streaming and batch sources, produces a snapshot and filters the data files by creation time. For streaming sources, upon first startup, and continue to read the latest changes."),
        FROM_SNAPSHOT("from-snapshot", "For streaming sources, continuously reads changes starting from snapshot specified by \"scan.snapshot-id\", without producing a snapshot at the beginning. For batch sources, produces a snapshot specified by \"scan.snapshot-id\" or \"scan.tag-name\" but does not read new changes."),
        FROM_SNAPSHOT_FULL("from-snapshot-full", "For streaming sources, produces from snapshot specified by \"scan.snapshot-id\" on the table upon first startup, and continuously reads changes. For batch sources, produces a snapshot specified by \"scan.snapshot-id\" but does not read new changes."),
        INCREMENTAL("incremental", "Read incremental changes between start and end snapshot or timestamp.");

        private final String value;
        private final String description;

        private StartupMode(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }
    }

    public static enum MergeEngine implements DescribedEnum
    {
        DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
        PARTIAL_UPDATE("partial-update", "Partial update non-null fields."),
        AGGREGATE("aggregation", "Aggregate fields with same primary key."),
        FIRST_ROW("first-row", "De-duplicate and keep the first row.");

        private final String value;
        private final String description;

        private MergeEngine(String value, String description) {
            this.value = value;
            this.description = description;
        }

        public String toString() {
            return this.value;
        }

        @Override
        public InlineElement getDescription() {
            return TextElement.text(this.description);
        }
    }
}

