/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc.watermark;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.watermark.CdcTimestampExtractor;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonProcessingException;

public class CdcWatermarkStrategy
implements WatermarkStrategy<CdcSourceRecord> {
    private final CdcTimestampExtractor timestampExtractor;
    private static final long serialVersionUID = 1L;
    private long currentMaxTimestamp;

    public CdcWatermarkStrategy(CdcTimestampExtractor extractor) {
        this.timestampExtractor = extractor;
    }

    public WatermarkGenerator<CdcSourceRecord> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
        return new WatermarkGenerator<CdcSourceRecord>(){

            public void onEvent(CdcSourceRecord record, long timestamp, WatermarkOutput output) {
                long tMs;
                try {
                    tMs = CdcWatermarkStrategy.this.timestampExtractor.extractTimestamp(record);
                }
                catch (JsonProcessingException e) {
                    throw new RuntimeException(e);
                }
                if (tMs != Long.MIN_VALUE) {
                    CdcWatermarkStrategy.this.currentMaxTimestamp = Math.max(CdcWatermarkStrategy.this.currentMaxTimestamp, tMs);
                    output.emitWatermark(new Watermark(CdcWatermarkStrategy.this.currentMaxTimestamp - 1L));
                }
            }

            public void onPeriodicEmit(WatermarkOutput output) {
                long timeMillis = System.currentTimeMillis();
                CdcWatermarkStrategy.this.currentMaxTimestamp = Math.max(timeMillis, CdcWatermarkStrategy.this.currentMaxTimestamp);
                output.emitWatermark(new Watermark(CdcWatermarkStrategy.this.currentMaxTimestamp - 1L));
            }
        };
    }
}

