/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader;

import java.io.Serializable;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.InMemoryReporter;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.test.junit5.InjectMiniCluster;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.logging.LoggerAuditingExtension;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.event.Level;

public class AlignedWatermarksITCase {
    public static final String SLOW_SOURCE_NAME = "SlowNumberSequenceSource";
    public static final String FAST_SOURCE_NAME = "FastNumberSequenceSource";
    private static final Duration UPDATE_INTERVAL = Duration.ofMillis(100L);
    public static final int MAX_DRIFT = 10;
    @RegisterExtension
    LoggerAuditingExtension loggerAuditingExtension = new LoggerAuditingExtension(AlignedWatermarksITCase.class, Level.INFO);
    private static final InMemoryReporter reporter = InMemoryReporter.createWithRetainedMetrics();
    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setConfiguration(reporter.addToConfiguration(new Configuration())).build());

    @Test
    public void testAlignment(@InjectMiniCluster MiniCluster miniCluster) throws Exception {
        JobGraph jobGraph = this.getJobGraph();
        CompletableFuture submission = miniCluster.submitJob(jobGraph);
        JobID jobID = ((JobSubmissionResult)submission.get()).getJobID();
        CommonTestUtils.waitForAllTaskRunning((MiniCluster)miniCluster, (JobID)jobID, (boolean)false);
        long oldDrift = Long.MAX_VALUE;
        do {
            Optional drift = reporter.findMetric(jobID, "FastNumberSequenceSource.*watermarkAlignmentDrift");
            Thread.sleep(200L);
            Optional<Long> newDriftOptional = drift.map(m -> (Long)((Gauge)m).getValue());
            if (!newDriftOptional.isPresent()) continue;
            Long newDrift = newDriftOptional.get();
            Assertions.assertThat((Long)newDrift).isLessThanOrEqualTo(oldDrift);
            oldDrift = newDrift;
        } while (oldDrift >= 10L);
    }

    private JobGraph getJobGraph() {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setAutoWatermarkInterval(0L);
        env.setParallelism(1);
        SingleOutputStreamOperator slowSource = env.fromSource((Source)new NumberSequenceSource(0L, Long.MAX_VALUE), WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PunctuatedGenerator()).withWatermarkAlignment("group-1", Duration.ofMillis(10L), UPDATE_INTERVAL).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(r, t) -> r), SLOW_SOURCE_NAME).map((MapFunction)new RichMapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                Thread.sleep(10L);
                return value;
            }
        });
        SingleOutputStreamOperator fastSource = env.fromSource((Source)new NumberSequenceSource(0L, Long.MAX_VALUE), WatermarkStrategy.forGenerator((WatermarkGeneratorSupplier & Serializable)ctx -> new PunctuatedGenerator()).withWatermarkAlignment("group-1", Duration.ofMillis(10L), UPDATE_INTERVAL).withTimestampAssigner((SerializableTimestampAssigner & Serializable)(r, t) -> r), FAST_SOURCE_NAME).map((MapFunction)new RichMapFunction<Long, Long>(){

            public Long map(Long value) throws Exception {
                Thread.sleep(1L);
                return value;
            }
        });
        slowSource.union(new DataStream[]{fastSource}).sinkTo((Sink)new DiscardingSink());
        return env.getStreamGraph().getJobGraph();
    }

    private static class PunctuatedGenerator
    implements WatermarkGenerator<Long> {
        private PunctuatedGenerator() {
        }

        public void onEvent(Long event, long eventTimestamp, WatermarkOutput output) {
            output.emitWatermark(new Watermark(eventTimestamp));
        }

        public void onPeriodicEmit(WatermarkOutput output) {
        }
    }
}

